in src/main/java/org/apache/sling/distribution/journal/impl/publisher/PackageDistributedNotifier.java [95:109]
private void processOffsets(String pubAgentName, Supplier<LongStream> offsets) {
long minOffset = offsets.get().findFirst().getAsLong();
if (ensureEvent) {
long lastDistributedOffset = lastDistributedOffsets.computeIfAbsent(pubAgentName, this::getLastStoredDistributedOffset);
minOffset = Math.min(offsets.get().findFirst().getAsLong(), lastDistributedOffset);
}
OffsetQueue<DistributionQueueItem> offsetQueue = pubQueueCacheService.getOffsetQueue(pubAgentName, minOffset);
offsets
.get()
.mapToObj(offsetQueue::getItem)
.filter(Objects::nonNull)
.forEach(msg -> notifyDistributed(pubAgentName, msg));
}