public void tick()

in connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java [310:439]


    public void tick() {
        // The main loop does two primary things: 1) drive the group membership protocol, responding to rebalance events
        // as they occur, and 2) handle external requests targeted at the leader. All the "real" work of the herder is
        // performed in this thread, which keeps synchronization straightforward at the cost of some operations possibly
        // blocking up this thread (especially those in callbacks due to rebalance events).

        try {
            // if we failed to read to end of log before, we need to make sure the issue was resolved before joining group
            // Joining and immediately leaving for failure to read configs is exceedingly impolite
            if (!canReadConfigs && !readConfigToEnd(workerSyncTimeoutMs))
                return; // Safe to return and tick immediately because readConfigToEnd will do the backoff for us

            log.debug("Ensuring group membership is still active");
            member.ensureActive();
            // Ensure we're in a good state in our group. If not restart and everything should be setup to rejoin
            if (!handleRebalanceCompleted()) return;
        } catch (WakeupException e) {
            // May be due to a request from another thread, or might be stopping. If the latter, we need to check the
            // flag immediately. If the former, we need to re-run the ensureActive call since we can't handle requests
            // unless we're in the group.
            log.trace("Woken up while ensure group membership is still active");
            return;
        }

        long now = time.milliseconds();

        if (checkForKeyRotation(now)) {
            log.debug("Distributing new session key");
            keyExpiration = Long.MAX_VALUE;
            configBackingStore.putSessionKey(new SessionKey(
                keyGenerator.generateKey(),
                now
            ));
        }

        // Process any external requests
        // TODO: Some of these can be performed concurrently or even optimized away entirely.
        //       For example, if three different connectors are slated to be restarted, it's fine to
        //       restart all three at the same time instead.
        //       Another example: if multiple configurations are submitted for the same connector,
        //       the only one that actually has to be written to the config topic is the
        //       most-recently one.
        long nextRequestTimeoutMs = Long.MAX_VALUE;
        while (true) {
            final DistributedHerderRequest next = peekWithoutException();
            if (next == null) {
                break;
            } else if (now >= next.at) {
                requests.pollFirst();
            } else {
                nextRequestTimeoutMs = next.at - now;
                break;
            }

            try {
                next.action().call();
                next.callback().onCompletion(null, null);
            } catch (Throwable t) {
                next.callback().onCompletion(t, null);
            }
        }

        if (scheduledRebalance < Long.MAX_VALUE) {
            nextRequestTimeoutMs = Math.min(nextRequestTimeoutMs, Math.max(scheduledRebalance - now, 0));
            rebalanceResolved = false;
            log.debug("Scheduled rebalance at: {} (now: {} nextRequestTimeoutMs: {}) ",
                    scheduledRebalance, now, nextRequestTimeoutMs);
        }
        if (internalRequestValidationEnabled() && keyExpiration < Long.MAX_VALUE) {
            nextRequestTimeoutMs = Math.min(nextRequestTimeoutMs, Math.max(keyExpiration - now, 0));
            log.debug("Scheduled next key rotation at: {} (now: {} nextRequestTimeoutMs: {}) ",
                    keyExpiration, now, nextRequestTimeoutMs);
        }

        // Process any configuration updates
        AtomicReference<Set<String>> connectorConfigUpdatesCopy = new AtomicReference<>();
        AtomicReference<Set<String>> connectorTargetStateChangesCopy = new AtomicReference<>();
        AtomicReference<Set<ConnectorTaskId>> taskConfigUpdatesCopy = new AtomicReference<>();

        boolean shouldReturn;
        if (member.currentProtocolVersion() == CONNECT_PROTOCOL_V0) {
            shouldReturn = updateConfigsWithEager(connectorConfigUpdatesCopy,
                    connectorTargetStateChangesCopy);
            // With eager protocol we should return immediately if needsReconfigRebalance has
            // been set to retain the old workflow
            if (shouldReturn) {
                return;
            }

            if (connectorConfigUpdatesCopy.get() != null) {
                processConnectorConfigUpdates(connectorConfigUpdatesCopy.get());
            }

            if (connectorTargetStateChangesCopy.get() != null) {
                processTargetStateChanges(connectorTargetStateChangesCopy.get());
            }
        } else {
            shouldReturn = updateConfigsWithIncrementalCooperative(connectorConfigUpdatesCopy,
                    connectorTargetStateChangesCopy, taskConfigUpdatesCopy);

            if (connectorConfigUpdatesCopy.get() != null) {
                processConnectorConfigUpdates(connectorConfigUpdatesCopy.get());
            }

            if (connectorTargetStateChangesCopy.get() != null) {
                processTargetStateChanges(connectorTargetStateChangesCopy.get());
            }

            if (taskConfigUpdatesCopy.get() != null) {
                processTaskConfigUpdatesWithIncrementalCooperative(taskConfigUpdatesCopy.get());
            }

            if (shouldReturn) {
                return;
            }
        }

        // Let the group take any actions it needs to
        try {
            log.trace("Polling for group activity; will wait for {}ms or until poll is interrupted by "
                    + "either config backing store updates or a new external request",
                    nextRequestTimeoutMs);
            member.poll(nextRequestTimeoutMs);
            // Ensure we're in a good state in our group. If not restart and everything should be setup to rejoin
            handleRebalanceCompleted();
        } catch (WakeupException e) { // FIXME should not be WakeupException
            log.trace("Woken up while polling for group activity");
            // Ignore. Just indicates we need to check the exit flag, for requested actions, etc.
        }
    }