in src/main/java/org/apache/sling/distribution/journal/impl/subscriber/DistributionSubscriber.java [318:340]
private void processPackageMessage(MessageInfo info, PackageMessage pkgMsg)
throws PersistenceException, LoginException, DistributionException, ImportPostProcessException, InterruptedException {
blockingSendStoredStatus();
boolean skip = shouldRemove(info.getOffset());
PackageMessage.ReqType type = pkgMsg.getReqType();
try {
this.state.set(DistributionAgentState.RUNNING);
idleCheck.busy(bookKeeper.getRetries(pkgMsg.getPubAgentName()), info.getCreateTime());
Date importStartTime = new Date();
Date createdTime = new Date(info.getCreateTime());
if (skip) {
bookKeeper.removePackage(pkgMsg, info.getOffset());
} else if (type == INVALIDATE) {
bookKeeper.invalidateCache(pkgMsg, info.getOffset(), createdTime, importStartTime);
} else {
bookKeeper.importPackage(pkgMsg, info.getOffset(), createdTime, importStartTime);
}
blockingSendStoredStatus();
} finally {
idleCheck.idle();
this.state.set(DistributionAgentState.IDLE);
}
}