public void onMessage()

in src/main/java/org/apache/sling/jobs/impl/JobQueueConsumerFactory.java [86:115]


    public void onMessage(Types.QueueName queueName, Map<String, Object> message) throws RequeueMessageException {

        final Job job = new JobImpl(new JobUpdateImpl(message));


        ((JobConsumer)jobManager).execute(job, new JobUpdateListener() {
            @Override
            public void update(@NotNull JobUpdate update) {
                if (update.getId() != job.getId() || !ALLOWED_COMMANDS.contains(update.getCommand())) {

                    throw new IllegalArgumentException("Not allowed to update other jobs or issue reserved commands when updating the state of a running job.");
                }
                topicManager.publish(update.getQueue().asTopicName(), update.getCommand().asCommandName(), Utils.toMapValue(update));
            }
        }, new JobCallback() {
            @Override
            public void callback(Job finalJobState) {
                if (finalJobState.getId() != job.getId()) {
                    throw new IllegalArgumentException("Final Job state ID must match initial JobState ID");
                }
                JobUpdate finalJobUpdate = finalJobState.newJobUpdateBuilder()
                        .command(JobUpdate.JobUpdateCommand.UPDATE_JOB)
                        .putAll(finalJobState.getProperties())
                        .build();
                topicManager.publish(finalJobUpdate.getQueue().asTopicName(), finalJobUpdate.getCommand().asCommandName(), Utils.toMapValue(finalJobUpdate));
            }
        });


    }