in src/main/java/org/apache/sling/event/impl/jobs/queues/JobQueueImpl.java [312:404]
private void startJob(final JobHandler handler) {
try {
this.closeMarker.set(false);
try {
final JobImpl job = handler.getJob();
handler.started = System.currentTimeMillis();
this.services.configuration.getAuditLogger().debug("START OK : {}", job.getId());
// sanity check for the queued property
Calendar queued = job.getProperty(JobImpl.PROPERTY_JOB_QUEUED, Calendar.class);
if ( queued == null ) {
// we simply use a date of ten seconds ago
queued = Calendar.getInstance();
queued.setTimeInMillis(System.currentTimeMillis() - 10000);
}
final long queueTime = handler.started - queued.getTimeInMillis();
// update statistics
this.services.statisticsManager.jobStarted(this.queueName, job.getTopic(), queueTime);
// send notification
NotificationUtility.sendNotification(this.services.eventAdmin, NotificationConstants.TOPIC_JOB_STARTED, job, queueTime);
synchronized ( this.processingJobsLists ) {
this.processingJobsLists.put(job.getId(), handler);
}
JobExecutionResultImpl result = JobExecutionResultImpl.CANCELLED;
Job.JobState resultState = Job.JobState.ERROR;
final JobExecutionContextImpl ctx = new JobExecutionContextImpl(handler, new JobExecutionContextImpl.ASyncHandler() {
@Override
public void finished(final JobState state) {
services.jobConsumerManager.unregisterListener(job.getId());
finishedJob(job.getId(), state, true);
asyncCounter.decrementAndGet();
}
});
try {
synchronized ( ctx ) {
result = (JobExecutionResultImpl)handler.getConsumer().process(job, ctx);
if ( result == null ) { // ASYNC processing
services.jobConsumerManager.registerListener(job.getId(), handler.getConsumer(), ctx);
asyncCounter.incrementAndGet();
ctx.markAsync();
} else {
if ( result.succeeded() ) {
resultState = Job.JobState.SUCCEEDED;
} else if ( result.failed() ) {
resultState = Job.JobState.QUEUED;
} else if ( result.cancelled() ) {
if ( handler.isStopped() ) {
resultState = Job.JobState.STOPPED;
} else {
resultState = Job.JobState.ERROR;
}
}
}
}
} catch (final Throwable t) { //NOSONAR
logger.error("Unhandled error occured in job processor " + t.getMessage() + " while processing job " + Utility.toString(job), t);
// we don't reschedule if an exception occurs
result = JobExecutionResultImpl.CANCELLED;
resultState = Job.JobState.ERROR;
} finally {
if ( result != null ) {
if ( result.getRetryDelayInMs() != null ) {
job.setProperty(JobImpl.PROPERTY_DELAY_OVERRIDE, result.getRetryDelayInMs());
}
if ( result.getMessage() != null ) {
job.setProperty(Job.PROPERTY_RESULT_MESSAGE, result.getMessage());
}
this.finishedJob(job.getId(), resultState, false);
}
}
} catch (final Exception re) {
// if an exception occurs, we just log
this.logger.error("Exception during job processing.", re);
}
} finally {
// try draining first
if (this.drainage.tryAcquire()) {
// special case : if drainage is used, this means maxparallel
// got reconfigured and we are not releasing a permit to
// available here, but instead reduce drainage.
final int approxPermits = this.drainage.availablePermits();
this.logger.debug("startJobHandler: drained 1 permit for {}, approx left to drain: {}",
queueName, approxPermits);
} else {
// otherwise release as usual
this.available.release();
}
}
}