private void startJob()

in src/main/java/org/apache/sling/event/impl/jobs/queues/JobQueueImpl.java [312:404]


    private void startJob(final JobHandler handler) {
        try {
            this.closeMarker.set(false);
            try {
                final JobImpl job = handler.getJob();
                handler.started = System.currentTimeMillis();

                this.services.configuration.getAuditLogger().debug("START OK : {}", job.getId());
                // sanity check for the queued property
                Calendar queued = job.getProperty(JobImpl.PROPERTY_JOB_QUEUED, Calendar.class);
                if ( queued == null ) {
                    // we simply use a date of ten seconds ago
                    queued = Calendar.getInstance();
                    queued.setTimeInMillis(System.currentTimeMillis() - 10000);
                }
                final long queueTime = handler.started - queued.getTimeInMillis();
                // update statistics
                this.services.statisticsManager.jobStarted(this.queueName, job.getTopic(), queueTime);
                // send notification
                NotificationUtility.sendNotification(this.services.eventAdmin, NotificationConstants.TOPIC_JOB_STARTED, job, queueTime);

                synchronized ( this.processingJobsLists ) {
                    this.processingJobsLists.put(job.getId(), handler);
                }

                JobExecutionResultImpl result = JobExecutionResultImpl.CANCELLED;
                Job.JobState resultState = Job.JobState.ERROR;
                final JobExecutionContextImpl ctx = new JobExecutionContextImpl(handler, new JobExecutionContextImpl.ASyncHandler() {

                    @Override
                    public void finished(final JobState state) {
                        services.jobConsumerManager.unregisterListener(job.getId());
                        finishedJob(job.getId(), state, true);
                        asyncCounter.decrementAndGet();
                    }
                });

                try {
                    synchronized ( ctx ) {
                        result = (JobExecutionResultImpl)handler.getConsumer().process(job, ctx);
                        if ( result == null ) { // ASYNC processing
                            services.jobConsumerManager.registerListener(job.getId(), handler.getConsumer(), ctx);
                            asyncCounter.incrementAndGet();
                            ctx.markAsync();
                        } else {
                            if ( result.succeeded() ) {
                                resultState = Job.JobState.SUCCEEDED;
                            } else if ( result.failed() ) {
                                resultState = Job.JobState.QUEUED;
                            } else if ( result.cancelled() ) {
                                if ( handler.isStopped() ) {
                                    resultState = Job.JobState.STOPPED;
                                } else {
                                    resultState = Job.JobState.ERROR;
                                }
                            }
                        }
                    }
                } catch (final Throwable t) { //NOSONAR
                    logger.error("Unhandled error occured in job processor " + t.getMessage() + " while processing job " + Utility.toString(job), t);
                    // we don't reschedule if an exception occurs
                    result = JobExecutionResultImpl.CANCELLED;
                    resultState = Job.JobState.ERROR;
                } finally {
                    if ( result != null ) {
                        if ( result.getRetryDelayInMs() != null ) {
                            job.setProperty(JobImpl.PROPERTY_DELAY_OVERRIDE, result.getRetryDelayInMs());
                        }
                        if ( result.getMessage() != null ) {
                           job.setProperty(Job.PROPERTY_RESULT_MESSAGE, result.getMessage());
                        }
                        this.finishedJob(job.getId(), resultState, false);
                    }
                }
            } catch (final Exception re) {
                // if an exception occurs, we just log
                this.logger.error("Exception during job processing.", re);
            }
        } finally {
            // try draining first
            if (this.drainage.tryAcquire()) {
                // special case : if drainage is used, this means maxparallel
                // got reconfigured and we are not releasing a permit to
                // available here, but instead reduce drainage.
                final int approxPermits = this.drainage.availablePermits();
                this.logger.debug("startJobHandler: drained 1 permit for {}, approx left to drain: {}",
                        queueName, approxPermits);
            } else {
                // otherwise release as usual
                this.available.release();
            }
        }
    }