in src/main/java/org/apache/sling/distribution/journal/impl/publisher/DistributionPublisher.java [149:187]
public void activate(PublisherConfiguration config, BundleContext context) {
requireNonNull(factory);
requireNonNull(distributionMetricsService);
pubAgentName = requireNotBlank(config.name());
queuedTimeout = config.queuedTimeout();
pkgType = packageBuilder.getType();
this.sender = messagingProvider.createSender(topics.getPackageTopic());
Dictionary<String, Object> props = createServiceProps(config);
componentReg = requireNonNull(context.registerService(DistributionAgent.class, this, props));
distributionLogEventListener = new DistributionLogEventListener(context, log, pubAgentName);
DistPublisherJMX bean;
try {
bean = new DistPublisherJMX(pubAgentName, discoveryService, this);
} catch (NotCompliantMBeanException e) {
throw new RuntimeException(e);
}
reg = new JMXRegistration(bean, "agent", pubAgentName);
String msg = format("Started Publisher agent %s with packageBuilder %s, queuedTimeout %s",
pubAgentName, pkgType, queuedTimeout);
distributionMetricsService.createGauge(
DistributionMetricsService.PUB_COMPONENT + ".subscriber_count;pub_name=" + pubAgentName,
() -> discoveryService.getTopologyView().getSubscribedAgentIds().size()
);
statusPoller = messagingProvider.createPoller(
topics.getStatusTopic(),
Reset.earliest,
HandlerAdapter.create(PackageStatusMessage.class, pubQueueProvider::handleStatus)
);
log.info(msg);
}