in src/main/java/org/apache/sling/distribution/journal/impl/subscriber/DistributionSubscriber.java [165:225]
public void activate(SubscriberConfiguration config, BundleContext context, Map<String, Object> properties) {
String subSlingId = requireNonNull(slingSettings.getSlingId());
subAgentName = requireNotBlank(config.name());
requireNonNull(config);
requireNonNull(context);
requireNonNull(packageBuilder);
requireNonNull(slingSettings);
requireNonNull(messagingProvider);
requireNonNull(topics);
requireNonNull(precondition);
requireNonNull(bookKeeperFactory);
Integer idleMillies = (Integer) properties.getOrDefault("idleMillies", SubscriberIdle.DEFAULT_IDLE_TIME_MILLIS);
if (config.editable()) {
commandPoller = new CommandPoller(messagingProvider, topics, subSlingId, subAgentName, delay::signal);
}
if (config.subscriberIdleCheck()) {
// Unofficial config (currently just for test)
AtomicBoolean readyHolder = subscriberReadyStore.getReadyHolder(subAgentName);
idleCheck = new SubscriberIdle(idleMillies, SubscriberIdle.DEFAULT_FORCE_IDLE_MILLIS, readyHolder);
idleReadyCheck = new SubscriberIdleCheck(context, idleCheck);
} else {
idleCheck = new NoopIdle();
}
queueNames = getNotEmpty(config.agentNames());
pkgType = requireNonNull(packageBuilder.getType());
Consumer<PackageStatusMessage> statusSender = messagingProvider.createSender(topics.getStatusTopic());
Consumer<LogMessage> logSender = messagingProvider.createSender(topics.getDiscoveryTopic());
String packageNodeName = escapeTopicName(messagingProvider.getServerUri(), topics.getPackageTopic());
BookKeeperConfig bkConfig = new BookKeeperConfig(
subAgentName,
subSlingId,
config.editable(),
config.maxRetries(),
config.packageHandling(),
packageNodeName,
config.contentPackageExtractorOverwritePrimaryTypesOfFolders());
bookKeeper = bookKeeperFactory.create(packageBuilder, bkConfig, statusSender, logSender);
long startOffset = bookKeeper.loadOffset() + 1;
String assign = startOffset > 0 ? messagingProvider.assignTo(startOffset) : null;
packagePoller = messagingProvider.createPoller(topics.getPackageTopic(), Reset.latest, assign,
HandlerAdapter.create(PackageMessage.class, this::handlePackageMessage), HandlerAdapter.create(OffsetMessage.class, this::handleOffsetMessage));
queueThread = startBackgroundThread(this::processQueue,
format("Queue Processor for Subscriber agent %s", subAgentName));
int announceDelay = PropertiesUtil.toInteger(properties.get("announceDelay"), 10000);
announcer = new Announcer(subSlingId, subAgentName, queueNames,
messagingProvider.createSender(topics.getDiscoveryTopic()), bookKeeper,
config.maxRetries(), config.editable(), announceDelay);
LOG.info("Started Subscriber agent {} at offset {}, subscribed to agent names {}", subAgentName, startOffset,
queueNames);
}