in src/main/java/org/apache/sling/event/impl/jobs/queues/QueueJobCache.java [279:335]
private List<JobImpl> loadJobs(final String queueName, final String topic,
final Resource topicResource,
final StatisticsManager statisticsManager) {
logger.debug("Loading jobs from topic {}", topic);
final List<JobImpl> list = new ArrayList<JobImpl>();
final AtomicBoolean scanTopic = new AtomicBoolean(false);
final AtomicBoolean haltTopic = new AtomicBoolean(false);
JobTopicTraverser.traverse(logger, topicResource, new JobTopicTraverser.JobCallback() {
@Override
public boolean handle(final JobImpl job) {
if ( job.getProcessingStarted() == null && !job.hasReadErrors() ) {
list.add(job);
statisticsManager.jobQueued(queueName, topic);
if ( list.size() == maxPreloadLimit ) {
scanTopic.set(true);
}
} else if ( job.getProcessingStarted() != null ) {
logger.debug("Ignoring job {} - processing already started.", job);
} else {
// error reading job
switch( job.getReadErrorType() ) {
case CLASSNOTFOUNDEXCEPTION : {
haltTopic.set(true);
break;
}
case RUNTIMEEXCEPTION : {
scanTopic.set(true);
logger.debug("Failing job {} due to unrecoverable read errors.", job);
final JobHandler handler = new JobHandler(job, null, configuration);
handler.finished(JobState.ERROR, true, null);
break;
}
default: {
scanTopic.set(true);
logger.debug("Ignoring job {} due to recoverable read errors.", job);
}
}
}
return list.size() < maxPreloadLimit;
}
});
if ( haltTopic.get() ) {
synchronized ( this.topicsWithNewJobs ) {
this.newlyHaltedTopics.add(topic);
}
} else if ( scanTopic.get() ) {
synchronized ( this.topicsWithNewJobs ) {
this.topicsWithNewJobs.add(topic);
}
}
logger.debug("Caching {} jobs for topic {}", list.size(), topic);
return list;
}