in src/main/java/org/apache/sling/distribution/queue/impl/simple/SimpleDistributionQueueProvider.java [112:168]
public void enableQueueProcessing(@NotNull DistributionQueueProcessor queueProcessor, String... queueNames) {
if (checkpoint) {
// recover from checkpoints
QueueItemMapper mapper = new QueueItemMapper();
log.debug("recovering from checkpoints if needed");
for (final String queueName : queueNames) {
log.debug("recovering for queue {}", queueName);
DistributionQueue queue = getQueue(queueName);
FilenameFilter filenameFilter = new FilenameFilter() {
@Override
public boolean accept(File file, String name) {
return name.equals(queueName + "-checkpoint");
}
};
for (File qf : checkpointDirectory.listFiles(filenameFilter)) {
log.info("recovering from checkpoint {}", qf);
try (FileReader fr = new FileReader(qf)) {
LineIterator lineIterator = IOUtils.lineIterator(fr);
while (lineIterator.hasNext()) {
String line = lineIterator.nextLine();
DistributionQueueItem item = mapper.readQueueItem(line);
queue.add(item);
}
log.info("recovered {} items from queue {}", queue.getStatus().getItemsCount(), queueName);
} catch (FileNotFoundException e) {
log.warn("could not read checkpoint file {}", qf.getAbsolutePath());
} catch (JsonException e) {
log.warn("could not parse info from checkpoint file {}", qf.getAbsolutePath());
} catch (IOException e) {
log.warn("IO error on checkpoint file {}", qf.getAbsolutePath());
}
}
}
// enable checkpointing
for (String queueName : queueNames) {
ScheduleOptions options = scheduler.NOW(-1, 15).canRunConcurrently(false)
.name(getJobName(queueName + "-checkpoint"));
scheduler.schedule(new SimpleDistributionQueueCheckpoint(getQueue(queueName), checkpointDirectory),
options);
}
}
// enable processing
for (String queueName : queueNames) {
ScheduleOptions options = scheduler.NOW(-1, 1)
.canRunConcurrently(false)
.name(getJobName(queueName));
DistributionQueue queueImpl = getQueue(queueName);
Consumer<DistributionQueueEntry> processingAttemptRecorder =
((SimpleDistributionQueue)queueImpl)::recordProcessingAttempt;
scheduler.schedule(new SimpleDistributionQueueProcessor(getQueue(queueName), queueProcessor, processingAttemptRecorder),
options);
}
}