in src/main/java/org/apache/sling/distribution/journal/impl/publisher/DistributionPublisher.java [113:157]
public DistributionPublisher(
@Reference
MessagingProvider messagingProvider,
@Reference(name = "packageBuilder")
DistributionPackageBuilder packageBuilder,
@Reference
DiscoveryService discoveryService,
@Reference
PackageMessageFactory factory,
@Reference
EventAdmin eventAdmin,
@Reference
MetricsService metricsService,
@Reference
PubQueueProvider pubQueueProvider,
@Reference(target = "(osgi.condition.id=toggle.FT_SLING-12218)", cardinality = OPTIONAL, policyOption = GREEDY)
Condition limitToggle,
PublisherConfiguration config,
BundleContext context) {
pubAgentName = requireNotBlank(config.name());
this.packageBuilder = packageBuilder;
this.factory = requireNonNull(factory);
this.eventAdmin = eventAdmin;
requireNonNull(metricsService);
this.publishMetrics = new PublishMetrics(metricsService, pubAgentName);
this.pubQueueProvider = pubQueueProvider;
this.publishMetrics.queueSize(() -> pubQueueProvider.getMaxQueueSize(pubAgentName));
distLog = new DefaultDistributionLog(pubAgentName, this.getClass(), DefaultDistributionLog.LogLevel.INFO);
distributionLogEventListener = new DistributionLogEventListener(context, distLog, pubAgentName);
limitEnabled = limitToggle != null;
queuedTimeout = config.queuedTimeout();
queueSizeLimit = config.queueSizeLimit();
maxQueueSizeDelay = config.maxQueueSizeDelay();
pkgType = packageBuilder.getType();
this.sender = messagingProvider.createSender(Topics.PACKAGE_TOPIC);
publishMetrics.subscriberCount(() -> discoveryService.getSubscriberCount(pubAgentName));
distLog.info("Started Publisher agent={} with packageBuilder={}, limitEnabled={}, queuedTimeout={}, queueSizeLimit={}, maxQueueSizeDelay={}",
pubAgentName, pkgType, limitEnabled, queuedTimeout, queueSizeLimit, maxQueueSizeDelay);
}