public void tick()

in connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java [411:578]


    public void tick() {
        // The main loop does two primary things: 1) drive the group membership protocol, responding to rebalance events
        // as they occur, and 2) handle external requests targeted at the leader. All the "real" work of the herder is
        // performed in this thread, which keeps synchronization straightforward at the cost of some operations possibly
        // blocking up this thread (especially those in callbacks due to rebalance events).

        try {
            // if we failed to read to end of log before, we need to make sure the issue was resolved before joining group
            // Joining and immediately leaving for failure to read configs is exceedingly impolite
            if (!canReadConfigs) {
                if (readConfigToEnd(workerSyncTimeoutMs)) {
                    canReadConfigs = true;
                } else {
                    return; // Safe to return and tick immediately because readConfigToEnd will do the backoff for us
                }
            }

            log.debug("Ensuring group membership is still active");
            String stageDescription = "ensuring membership in the cluster";
            member.ensureActive(() -> new TickThreadStage(stageDescription));
            completeTickThreadStage();
            // Ensure we're in a good state in our group. If not restart and everything should be setup to rejoin
            if (!handleRebalanceCompleted()) return;
        } catch (WakeupException e) {
            // May be due to a request from another thread, or might be stopping. If the latter, we need to check the
            // flag immediately. If the former, we need to re-run the ensureActive call since we can't handle requests
            // unless we're in the group.
            log.trace("Woken up while ensure group membership is still active");
            return;
        }

        if (fencedFromConfigTopic) {
            if (isLeader()) {
                // We were accidentally fenced out, possibly by a zombie leader
                try {
                    log.debug("Reclaiming write privileges for config topic after being fenced out");
                    try (TickThreadStage stage = new TickThreadStage("reclaiming write privileges for the config topic")) {
                        configBackingStore.claimWritePrivileges();
                    }
                    fencedFromConfigTopic = false;
                    log.debug("Successfully reclaimed write privileges for config topic after being fenced out");
                } catch (Exception e) {
                    log.warn("Unable to claim write privileges for config topic. Will backoff and possibly retry if still the leader", e);
                    backoff(CONFIG_TOPIC_WRITE_PRIVILEGES_BACKOFF_MS);
                    return;
                }
            } else {
                log.trace("Relinquished write privileges for config topic after being fenced out, since worker is no longer the leader of the cluster");
                // We were meant to be fenced out because we fell out of the group and a new leader was elected
                fencedFromConfigTopic = false;
            }
        }

        long now = time.milliseconds();

        if (checkForKeyRotation(now)) {
            log.debug("Distributing new session key");
            keyExpiration = Long.MAX_VALUE;
            try {
                SessionKey newSessionKey = new SessionKey(keyGenerator.generateKey(), now);
                writeToConfigTopicAsLeader(
                        "writing a new session key to the config topic",
                        () -> configBackingStore.putSessionKey(newSessionKey)
                );
            } catch (Exception e) {
                log.info("Failed to write new session key to config topic; forcing a read to the end of the config topic before possibly retrying", e);
                canReadConfigs = false;
                return;
            }
        }

        // Process any external requests
        // TODO: Some of these can be performed concurrently or even optimized away entirely.
        //       For example, if three different connectors are slated to be restarted, it's fine to
        //       restart all three at the same time instead.
        //       Another example: if multiple configurations are submitted for the same connector,
        //       the only one that actually has to be written to the config topic is the
        //       most-recently one.
        Long scheduledTick = null;
        while (true) {
            final DistributedHerderRequest next = peekWithoutException();
            if (next == null) {
                break;
            } else if (now >= next.at) {
                currentRequest = requests.pollFirst();
            } else {
                scheduledTick = next.at;
                break;
            }

            runRequest(next.action(), next.callback());
        }

        // Process all pending connector restart requests
        processRestartRequests();

        if (scheduledRebalance < Long.MAX_VALUE) {
            scheduledTick = scheduledTick != null ? Math.min(scheduledTick, scheduledRebalance) : scheduledRebalance;
            rebalanceResolved = false;
            log.debug("Scheduled rebalance at: {} (now: {} scheduledTick: {}) ",
                    scheduledRebalance, now, scheduledTick);
        }
        if (isLeader() && internalRequestValidationEnabled() && keyExpiration < Long.MAX_VALUE) {
            scheduledTick = scheduledTick != null ? Math.min(scheduledTick, keyExpiration) : keyExpiration;
            log.debug("Scheduled next key rotation at: {} (now: {} scheduledTick: {}) ",
                    keyExpiration, now, scheduledTick);
        }

        // Process any configuration updates
        AtomicReference<Set<String>> connectorConfigUpdatesCopy = new AtomicReference<>();
        AtomicReference<Set<String>> connectorTargetStateChangesCopy = new AtomicReference<>();
        AtomicReference<Set<ConnectorTaskId>> taskConfigUpdatesCopy = new AtomicReference<>();

        boolean shouldReturn;
        if (member.currentProtocolVersion() == CONNECT_PROTOCOL_V0) {
            shouldReturn = updateConfigsWithEager(connectorConfigUpdatesCopy,
                    connectorTargetStateChangesCopy);
            // With eager protocol we should return immediately if needsReconfigRebalance has
            // been set to retain the old workflow
            if (shouldReturn) {
                return;
            }

            if (connectorConfigUpdatesCopy.get() != null) {
                processConnectorConfigUpdates(connectorConfigUpdatesCopy.get());
            }

            if (connectorTargetStateChangesCopy.get() != null) {
                processTargetStateChanges(connectorTargetStateChangesCopy.get());
            }
        } else {
            shouldReturn = updateConfigsWithIncrementalCooperative(connectorConfigUpdatesCopy,
                    connectorTargetStateChangesCopy, taskConfigUpdatesCopy);

            if (connectorConfigUpdatesCopy.get() != null) {
                processConnectorConfigUpdates(connectorConfigUpdatesCopy.get());
            }

            if (connectorTargetStateChangesCopy.get() != null) {
                processTargetStateChanges(connectorTargetStateChangesCopy.get());
            }

            if (taskConfigUpdatesCopy.get() != null) {
                processTaskConfigUpdatesWithIncrementalCooperative(taskConfigUpdatesCopy.get());
            }

            if (shouldReturn) {
                return;
            }
        }

        // Let the group take any actions it needs to
        try {
            long nextRequestTimeoutMs = scheduledTick != null ? Math.max(scheduledTick - time.milliseconds(), 0L) : Long.MAX_VALUE;
            log.trace("Polling for group activity; will wait for {}ms or until poll is interrupted by "
                    + "either config backing store updates or a new external request",
                    nextRequestTimeoutMs);
            String pollDurationDescription = scheduledTick != null ? "for up to " + nextRequestTimeoutMs + "ms or " : "";
            String stageDescription = "polling the group coordinator " + pollDurationDescription + "until interrupted";
            member.poll(nextRequestTimeoutMs, () -> new TickThreadStage(stageDescription));
            completeTickThreadStage();
            // Ensure we're in a good state in our group. If not restart and everything should be setup to rejoin
            handleRebalanceCompleted();
        } catch (WakeupException e) { // FIXME should not be WakeupException
            log.trace("Woken up while polling for group activity");
            // Ignore. Just indicates we need to check the exit flag, for requested actions, etc.
        }
    }