public void activate()

in src/main/java/org/apache/sling/distribution/journal/impl/subscriber/DistributionSubscriber.java [165:225]


    public void activate(SubscriberConfiguration config, BundleContext context, Map<String, Object> properties) {
        String subSlingId = requireNonNull(slingSettings.getSlingId());
        subAgentName = requireNotBlank(config.name());
        requireNonNull(config);
        requireNonNull(context);
        requireNonNull(packageBuilder);
        requireNonNull(slingSettings);
        requireNonNull(messagingProvider);
        requireNonNull(topics);
        requireNonNull(precondition);
        requireNonNull(bookKeeperFactory);

        Integer idleMillies = (Integer) properties.getOrDefault("idleMillies", SubscriberIdle.DEFAULT_IDLE_TIME_MILLIS);
        if (config.editable()) {
            commandPoller = new CommandPoller(messagingProvider, topics, subSlingId, subAgentName, delay::signal);
        }

        if (config.subscriberIdleCheck()) {
            // Unofficial config (currently just for test)
            AtomicBoolean readyHolder = subscriberReadyStore.getReadyHolder(subAgentName);

            idleCheck = new SubscriberIdle(idleMillies, SubscriberIdle.DEFAULT_FORCE_IDLE_MILLIS, readyHolder);
            idleReadyCheck = new SubscriberIdleCheck(context, idleCheck);
        } else {
            idleCheck = new NoopIdle();
        }

        queueNames = getNotEmpty(config.agentNames());
        pkgType = requireNonNull(packageBuilder.getType());

        Consumer<PackageStatusMessage> statusSender = messagingProvider.createSender(topics.getStatusTopic());
        Consumer<LogMessage> logSender = messagingProvider.createSender(topics.getDiscoveryTopic());

        String packageNodeName = escapeTopicName(messagingProvider.getServerUri(), topics.getPackageTopic());
        BookKeeperConfig bkConfig = new BookKeeperConfig(
                subAgentName,
                subSlingId,
                config.editable(),
                config.maxRetries(),
                config.packageHandling(),
                packageNodeName,
                config.contentPackageExtractorOverwritePrimaryTypesOfFolders());
        bookKeeper = bookKeeperFactory.create(packageBuilder, bkConfig, statusSender, logSender);

        long startOffset = bookKeeper.loadOffset() + 1;
        String assign = startOffset > 0 ? messagingProvider.assignTo(startOffset) : null;

        packagePoller = messagingProvider.createPoller(topics.getPackageTopic(), Reset.latest, assign,
                HandlerAdapter.create(PackageMessage.class, this::handlePackageMessage), HandlerAdapter.create(OffsetMessage.class, this::handleOffsetMessage));

        queueThread = startBackgroundThread(this::processQueue,
                format("Queue Processor for Subscriber agent %s", subAgentName));

        int announceDelay = PropertiesUtil.toInteger(properties.get("announceDelay"), 10000);
        announcer = new Announcer(subSlingId, subAgentName, queueNames,
                messagingProvider.createSender(topics.getDiscoveryTopic()), bookKeeper,
                config.maxRetries(), config.editable(), announceDelay);

        LOG.info("Started Subscriber agent {} at offset {}, subscribed to agent names {}", subAgentName, startOffset,
                queueNames);
    }