private void processOffsets()

in src/main/java/org/apache/sling/distribution/journal/impl/publisher/PackageDistributedNotifier.java [95:109]


    private void processOffsets(String pubAgentName, Supplier<LongStream> offsets) {
        long minOffset = offsets.get().findFirst().getAsLong();

        if (ensureEvent) {
            long lastDistributedOffset = lastDistributedOffsets.computeIfAbsent(pubAgentName, this::getLastStoredDistributedOffset);
            minOffset = Math.min(offsets.get().findFirst().getAsLong(), lastDistributedOffset);
        }

        OffsetQueue<DistributionQueueItem> offsetQueue = pubQueueCacheService.getOffsetQueue(pubAgentName, minOffset);
        offsets
            .get()
            .mapToObj(offsetQueue::getItem)
            .filter(Objects::nonNull)
            .forEach(msg -> notifyDistributed(pubAgentName, msg));
    }