in src/main/java/org/apache/sling/jobs/impl/JobQueueConsumerFactory.java [86:115]
public void onMessage(Types.QueueName queueName, Map<String, Object> message) throws RequeueMessageException {
final Job job = new JobImpl(new JobUpdateImpl(message));
((JobConsumer)jobManager).execute(job, new JobUpdateListener() {
@Override
public void update(@NotNull JobUpdate update) {
if (update.getId() != job.getId() || !ALLOWED_COMMANDS.contains(update.getCommand())) {
throw new IllegalArgumentException("Not allowed to update other jobs or issue reserved commands when updating the state of a running job.");
}
topicManager.publish(update.getQueue().asTopicName(), update.getCommand().asCommandName(), Utils.toMapValue(update));
}
}, new JobCallback() {
@Override
public void callback(Job finalJobState) {
if (finalJobState.getId() != job.getId()) {
throw new IllegalArgumentException("Final Job state ID must match initial JobState ID");
}
JobUpdate finalJobUpdate = finalJobState.newJobUpdateBuilder()
.command(JobUpdate.JobUpdateCommand.UPDATE_JOB)
.putAll(finalJobState.getProperties())
.build();
topicManager.publish(finalJobUpdate.getQueue().asTopicName(), finalJobUpdate.getCommand().asCommandName(), Utils.toMapValue(finalJobUpdate));
}
});
}