public DistributionSubscriber()

in src/main/java/org/apache/sling/distribution/journal/impl/subscriber/DistributionSubscriber.java [135:200]


	public DistributionSubscriber(
			@Reference(name = "packageBuilder") DistributionPackageBuilder packageBuilder,
		    @Reference SlingSettingsService slingSettings,
		    @Reference MessagingProvider messagingProvider,
		    @Reference(name = "precondition") Precondition precondition,
		    @Reference MetricsService metricsService,
		    @Reference BookKeeperFactory bookKeeperFactory,
		    @Reference SubscriberReadyStore subscriberReadyStore,
		    @Reference OnlyOnLeader onlyOnLeader,
		    SubscriberConfiguration config, BundleContext context, Map<String, Object> properties
			) {
		String subSlingId = requireNonNull(slingSettings.getSlingId());
        subAgentName = requireNotBlank(config.name());
        this.precondition = requireNonNull(precondition);
        this.subscriberMetrics = new SubscriberMetrics(metricsService, subAgentName, getFirst(config.agentNames()), config.editable());

        if (config.subscriberIdleCheck()) {
            AtomicBoolean readyHolder = subscriberReadyStore.getReadyHolder(subAgentName);
            idleCheck = new SubscriberReady(subAgentName, config.idleMillies(), config.forceReadyMillies(), config.acceptableAgeDiffMs(), readyHolder, System::currentTimeMillis);
            idleReadyCheck = new SubscriberIdleCheck(context, idleCheck, config.subscriberIdleTags());
        } else {
            idleCheck = new NoopIdle();
        }

        queueNames = getNotEmpty(config.agentNames());
        pkgType = requireNonNull(packageBuilder.getType());

        Consumer<PackageStatusMessage> statusSender = messagingProvider.createSender(Topics.STATUS_TOPIC);
        Consumer<LogMessage> logSender = messagingProvider.createSender(Topics.DISCOVERY_TOPIC);

        String packageNodeName = escapeTopicName(messagingProvider.getServerUri(), Topics.PACKAGE_TOPIC);
        String commandNodeName = escapeTopicName(messagingProvider.getServerUri(), Topics.COMMAND_TOPIC);
        BookKeeperConfig bkConfig = new BookKeeperConfig(
                subAgentName,
                subSlingId,
                config.editable(),
                config.maxRetries(),
                config.packageHandling(),
                packageNodeName,
                commandNodeName,
                config.contentPackageExtractorOverwritePrimaryTypesOfFolders());
        bookKeeper = bookKeeperFactory.create(packageBuilder, bkConfig, statusSender, logSender, this.subscriberMetrics);
        
        if (config.editable()) {
        	Consumer<Long> clearHandler = (Long offset) -> {
        		bookKeeper.storeClearOffset(offset);
        		delay.signal();
        	};
        	
            commandPoller = new CommandPoller(messagingProvider, subSlingId, subAgentName, bookKeeper.getClearOffset(), clearHandler);
        }

        long startOffset = bookKeeper.loadOffset() + 1;
        String assign = startOffset > 0 ? messagingProvider.assignTo(startOffset) : null;

        packagePoller = messagingProvider.createPoller(Topics.PACKAGE_TOPIC, Reset.latest, assign,
                HandlerAdapter.create(PackageMessage.class, this::handlePackageMessage), HandlerAdapter.create(OffsetMessage.class, this::handleOffsetMessage));

        int announceDelay = Converters.standardConverter().convert(properties.get("announceDelay")).defaultValue(10000).to(Integer.class);
        announcer = new Announcer(subSlingId, subAgentName, queueNames,
                messagingProvider.createSender(Topics.DISCOVERY_TOPIC), bookKeeper,
                config.maxRetries(), config.editable(), announceDelay);

        LOG.info("Started Subscriber agent={} at offset={}, subscribed to agent names {}, readyCheck={}", subAgentName, startOffset,
                queueNames, config.subscriberIdleCheck());
    }