in src/main/java/org/apache/sling/distribution/journal/impl/subscriber/DistributionSubscriber.java [135:200]
public DistributionSubscriber(
@Reference(name = "packageBuilder") DistributionPackageBuilder packageBuilder,
@Reference SlingSettingsService slingSettings,
@Reference MessagingProvider messagingProvider,
@Reference(name = "precondition") Precondition precondition,
@Reference MetricsService metricsService,
@Reference BookKeeperFactory bookKeeperFactory,
@Reference SubscriberReadyStore subscriberReadyStore,
@Reference OnlyOnLeader onlyOnLeader,
SubscriberConfiguration config, BundleContext context, Map<String, Object> properties
) {
String subSlingId = requireNonNull(slingSettings.getSlingId());
subAgentName = requireNotBlank(config.name());
this.precondition = requireNonNull(precondition);
this.subscriberMetrics = new SubscriberMetrics(metricsService, subAgentName, getFirst(config.agentNames()), config.editable());
if (config.subscriberIdleCheck()) {
AtomicBoolean readyHolder = subscriberReadyStore.getReadyHolder(subAgentName);
idleCheck = new SubscriberReady(subAgentName, config.idleMillies(), config.forceReadyMillies(), config.acceptableAgeDiffMs(), readyHolder, System::currentTimeMillis);
idleReadyCheck = new SubscriberIdleCheck(context, idleCheck, config.subscriberIdleTags());
} else {
idleCheck = new NoopIdle();
}
queueNames = getNotEmpty(config.agentNames());
pkgType = requireNonNull(packageBuilder.getType());
Consumer<PackageStatusMessage> statusSender = messagingProvider.createSender(Topics.STATUS_TOPIC);
Consumer<LogMessage> logSender = messagingProvider.createSender(Topics.DISCOVERY_TOPIC);
String packageNodeName = escapeTopicName(messagingProvider.getServerUri(), Topics.PACKAGE_TOPIC);
String commandNodeName = escapeTopicName(messagingProvider.getServerUri(), Topics.COMMAND_TOPIC);
BookKeeperConfig bkConfig = new BookKeeperConfig(
subAgentName,
subSlingId,
config.editable(),
config.maxRetries(),
config.packageHandling(),
packageNodeName,
commandNodeName,
config.contentPackageExtractorOverwritePrimaryTypesOfFolders());
bookKeeper = bookKeeperFactory.create(packageBuilder, bkConfig, statusSender, logSender, this.subscriberMetrics);
if (config.editable()) {
Consumer<Long> clearHandler = (Long offset) -> {
bookKeeper.storeClearOffset(offset);
delay.signal();
};
commandPoller = new CommandPoller(messagingProvider, subSlingId, subAgentName, bookKeeper.getClearOffset(), clearHandler);
}
long startOffset = bookKeeper.loadOffset() + 1;
String assign = startOffset > 0 ? messagingProvider.assignTo(startOffset) : null;
packagePoller = messagingProvider.createPoller(Topics.PACKAGE_TOPIC, Reset.latest, assign,
HandlerAdapter.create(PackageMessage.class, this::handlePackageMessage), HandlerAdapter.create(OffsetMessage.class, this::handleOffsetMessage));
int announceDelay = Converters.standardConverter().convert(properties.get("announceDelay")).defaultValue(10000).to(Integer.class);
announcer = new Announcer(subSlingId, subAgentName, queueNames,
messagingProvider.createSender(Topics.DISCOVERY_TOPIC), bookKeeper,
config.maxRetries(), config.editable(), announceDelay);
LOG.info("Started Subscriber agent={} at offset={}, subscribed to agent names {}, readyCheck={}", subAgentName, startOffset,
queueNames, config.subscriberIdleCheck());
}