in src/main/java/org/apache/sling/distribution/journal/bookkeeper/BookKeeper.java [155:186]
public void importPackage(PackageMessage pkgMsg, long offset, Date createdTime, Date importStartTime) throws DistributionException {
log.debug("Importing distribution package {} at offset={}", pkgMsg, offset);
try (Timer.Context context = subscriberMetrics.getImportedPackageDuration().time();
ResourceResolver importerResolver = getServiceResolver(SUBSERVICE_IMPORTER)) {
// Execute the pre-processor
preProcess(pkgMsg);
subscriberMetrics.setCurrentImport(new CurrentImportInfo(pkgMsg, offset, importStartTime.getTime()));
packageHandler.apply(importerResolver, pkgMsg);
if (config.isEditable()) {
storeStatus(importerResolver, new PackageStatus(Status.IMPORTED, offset, pkgMsg.getPubAgentName()));
}
storeOffset(importerResolver, offset);
importerResolver.commit();
subscriberMetrics.getImportedPackageSize().update(pkgMsg.getPkgLength());
subscriberMetrics.getPackageDistributedDuration().update((currentTimeMillis() - createdTime.getTime()), TimeUnit.MILLISECONDS);
// Execute the post-processor
postProcess(pkgMsg);
clearPackageRetriesOnSuccess(pkgMsg);
Event event = new AppliedEvent(pkgMsg, config.getSubAgentName()).toEvent();
eventAdmin.postEvent(event);
Duration currentImporturation = Duration.ofMillis(System.currentTimeMillis() - importStartTime.getTime());
log.info("Imported distribution package {} at offset={} took importDurationMs={} created={}", pkgMsg, offset, currentImporturation.toMillis(), createdTime);
subscriberMetrics.getPackageStatusCounter(pkgMsg.getPubAgentName(), Status.IMPORTED).increment();
} catch (DistributionException | LoginException | IOException | RuntimeException | ImportPreProcessException |ImportPostProcessException e) {
failure(pkgMsg, offset, createdTime, e);
} finally {
subscriberMetrics.clearCurrentImport();
}
}