public void enableQueueProcessing()

in src/main/java/org/apache/sling/distribution/queue/impl/simple/SimpleDistributionQueueProvider.java [112:168]


    public void enableQueueProcessing(@NotNull DistributionQueueProcessor queueProcessor, String... queueNames) {

        if (checkpoint) {
            // recover from checkpoints
            QueueItemMapper mapper = new QueueItemMapper();
            log.debug("recovering from checkpoints if needed");
            for (final String queueName : queueNames) {
                log.debug("recovering for queue {}", queueName);
                DistributionQueue queue = getQueue(queueName);
                FilenameFilter filenameFilter = new FilenameFilter() {
                    @Override
                    public boolean accept(File file, String name) {
                        return name.equals(queueName + "-checkpoint");
                    }
                };
                for (File qf : checkpointDirectory.listFiles(filenameFilter)) {
                    log.info("recovering from checkpoint {}", qf);
                    try (FileReader fr = new FileReader(qf)) {
                        LineIterator lineIterator = IOUtils.lineIterator(fr);
                        while (lineIterator.hasNext()) {
                            String line = lineIterator.nextLine();
                            DistributionQueueItem item = mapper.readQueueItem(line);
                            queue.add(item);
                        }
                        log.info("recovered {} items from queue {}", queue.getStatus().getItemsCount(), queueName);
                    } catch (FileNotFoundException e) {
                        log.warn("could not read checkpoint file {}", qf.getAbsolutePath());
                    } catch (JsonException e) {
                        log.warn("could not parse info from checkpoint file {}", qf.getAbsolutePath());
                    } catch (IOException e) {
                        log.warn("IO error on checkpoint file {}", qf.getAbsolutePath());
                    }
                }
            }

            // enable checkpointing
            for (String queueName : queueNames) {
                ScheduleOptions options = scheduler.NOW(-1, 15).canRunConcurrently(false)
                        .name(getJobName(queueName + "-checkpoint"));
                scheduler.schedule(new SimpleDistributionQueueCheckpoint(getQueue(queueName), checkpointDirectory),
                        options);
            }
        }

        // enable processing
        for (String queueName : queueNames) {
            ScheduleOptions options = scheduler.NOW(-1, 1)
                    .canRunConcurrently(false)
                    .name(getJobName(queueName));
            DistributionQueue queueImpl = getQueue(queueName);
            Consumer<DistributionQueueEntry> processingAttemptRecorder =
                    ((SimpleDistributionQueue)queueImpl)::recordProcessingAttempt;
            scheduler.schedule(new SimpleDistributionQueueProcessor(getQueue(queueName), queueProcessor, processingAttemptRecorder),
                    options);
        }

    }