public JobHandler getNextJob()

in src/main/java/org/apache/sling/event/impl/jobs/queues/QueueJobCache.java [158:212]


    public JobHandler getNextJob(final JobConsumerManager jobConsumerManager,
            final StatisticsManager statisticsManager,
            final Queue queue,
            final boolean doFull) {
        JobHandler handler = null;

        if ( !this.queueIsBlocked.get() ) {
            synchronized ( this.cache ) {
                boolean retry;
                do {
                    retry = false;
                    if ( this.cache.isEmpty() ) {
                        final Set<String> checkingTopics = new HashSet<String>();
                        synchronized ( this.topicsWithNewJobs ) {
                            checkingTopics.addAll(this.topicsWithNewJobs);
                            this.topicsWithNewJobs.clear();
                        }
                        if ( doFull ) {
                            checkingTopics.addAll(this.topics);
                        }
                        checkingTopics.removeAll(newlyHaltedTopics);
                        if ( !checkingTopics.isEmpty() ) {
                            this.loadJobs(queue.getName(), checkingTopics, statisticsManager);
                        }
                    }

                    if ( !this.cache.isEmpty() ) {
                        final JobImpl job = this.cache.remove(0);
                        final JobExecutor consumer = jobConsumerManager.getExecutor(job.getTopic());

                        handler = new JobHandler(job, consumer, this.configuration);
                        if ( consumer != null ) {
                            if ( !handler.startProcessing(queue) ) {
                                statisticsManager.jobDequeued(queue.getName(), handler.getJob().getTopic());
                                if ( logger.isDebugEnabled() ) {
                                    logger.debug("Discarding removed job {}", Utility.toString(job));
                                }
                                handler = null;
                                retry = true;
                            }
                        } else {
                            statisticsManager.jobDequeued(queue.getName(), handler.getJob().getTopic());
                            // no consumer on this instance, assign to another instance
                            handler.reassign();

                            handler = null;
                            retry = true;
                        }

                    }
                } while ( handler == null && retry);
            }
        }
        return handler;
    }