private List loadJobs()

in src/main/java/org/apache/sling/event/impl/jobs/queues/QueueJobCache.java [279:335]


    private List<JobImpl> loadJobs(final String queueName, final String topic,
            final Resource topicResource,
            final StatisticsManager statisticsManager) {
        logger.debug("Loading jobs from topic {}", topic);
        final List<JobImpl> list = new ArrayList<JobImpl>();

        final AtomicBoolean scanTopic = new AtomicBoolean(false);
        final AtomicBoolean haltTopic = new AtomicBoolean(false);

        JobTopicTraverser.traverse(logger, topicResource, new JobTopicTraverser.JobCallback() {

            @Override
            public boolean handle(final JobImpl job) {
                if ( job.getProcessingStarted() == null && !job.hasReadErrors() ) {
                    list.add(job);
                    statisticsManager.jobQueued(queueName, topic);
                    if ( list.size() == maxPreloadLimit ) {
                        scanTopic.set(true);
                    }
                } else if ( job.getProcessingStarted() != null ) {
                    logger.debug("Ignoring job {} - processing already started.", job);
                } else {
                    // error reading job
                    switch( job.getReadErrorType() ) {
                    case CLASSNOTFOUNDEXCEPTION : {
                        haltTopic.set(true);
                        break;
                    }
                    case RUNTIMEEXCEPTION : {
                        scanTopic.set(true);
                        logger.debug("Failing job {} due to unrecoverable read errors.", job);
                        final JobHandler handler = new JobHandler(job, null, configuration);
                        handler.finished(JobState.ERROR, true, null);
                        break;
                    }
                    default: {
                        scanTopic.set(true);
                        logger.debug("Ignoring job {} due to recoverable read errors.", job);
                    }
                    }
                }
                return list.size() < maxPreloadLimit;
            }
        });
        if ( haltTopic.get() ) {
            synchronized ( this.topicsWithNewJobs ) {
                this.newlyHaltedTopics.add(topic);
            }
        } else if ( scanTopic.get() ) {
            synchronized ( this.topicsWithNewJobs ) {
                this.topicsWithNewJobs.add(topic);
            }
        }
        logger.debug("Caching {} jobs for topic {}", list.size(), topic);

        return list;
    }