in src/main/java/org/apache/sling/event/impl/jobs/queues/QueueJobCache.java [158:212]
public JobHandler getNextJob(final JobConsumerManager jobConsumerManager,
final StatisticsManager statisticsManager,
final Queue queue,
final boolean doFull) {
JobHandler handler = null;
if ( !this.queueIsBlocked.get() ) {
synchronized ( this.cache ) {
boolean retry;
do {
retry = false;
if ( this.cache.isEmpty() ) {
final Set<String> checkingTopics = new HashSet<String>();
synchronized ( this.topicsWithNewJobs ) {
checkingTopics.addAll(this.topicsWithNewJobs);
this.topicsWithNewJobs.clear();
}
if ( doFull ) {
checkingTopics.addAll(this.topics);
}
checkingTopics.removeAll(newlyHaltedTopics);
if ( !checkingTopics.isEmpty() ) {
this.loadJobs(queue.getName(), checkingTopics, statisticsManager);
}
}
if ( !this.cache.isEmpty() ) {
final JobImpl job = this.cache.remove(0);
final JobExecutor consumer = jobConsumerManager.getExecutor(job.getTopic());
handler = new JobHandler(job, consumer, this.configuration);
if ( consumer != null ) {
if ( !handler.startProcessing(queue) ) {
statisticsManager.jobDequeued(queue.getName(), handler.getJob().getTopic());
if ( logger.isDebugEnabled() ) {
logger.debug("Discarding removed job {}", Utility.toString(job));
}
handler = null;
retry = true;
}
} else {
statisticsManager.jobDequeued(queue.getName(), handler.getJob().getTopic());
// no consumer on this instance, assign to another instance
handler.reassign();
handler = null;
retry = true;
}
}
} while ( handler == null && retry);
}
}
return handler;
}