in src/main/java/org/apache/sling/distribution/journal/impl/subscriber/DistributionSubscriber.java [250:281]
public boolean tryProcess(MessageInfo info, PackageMessage message) {
if (shouldSkip(info, message)) {
try {
bookKeeper.skipPackage(info.getOffset());
} catch (PersistenceException | LoginException e) {
LOG.warn("Error marking distribution package {} at offset={} as skipped", message, info.getOffset(), e);
}
return true;
}
subscriberMetrics.getPackageJournalDistributionDuration()
.update((currentTimeMillis() - info.getCreateTime()), TimeUnit.MILLISECONDS);
try {
processPackageMessage(info, message);
catchAllDelay = catchAllDelays.get();
} catch (PreConditionTimeoutException e) {
// Precondition timed out. We only log this on info level as it is no error
LOG.info(e.getMessage());
delay.await(RETRY_DELAY_MILLIS);
return false;
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
LOG.debug(e.getMessage());
} catch (Exception e) {
// Catch all to prevent processing from stopping
LOG.error("Error processing queue item", e);
delay.await(catchAllDelay.getAsLong());
return false;
} finally {
announcer.run();
}
return true;
}