in connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java [411:578]
public void tick() {
// The main loop does two primary things: 1) drive the group membership protocol, responding to rebalance events
// as they occur, and 2) handle external requests targeted at the leader. All the "real" work of the herder is
// performed in this thread, which keeps synchronization straightforward at the cost of some operations possibly
// blocking up this thread (especially those in callbacks due to rebalance events).
try {
// if we failed to read to end of log before, we need to make sure the issue was resolved before joining group
// Joining and immediately leaving for failure to read configs is exceedingly impolite
if (!canReadConfigs) {
if (readConfigToEnd(workerSyncTimeoutMs)) {
canReadConfigs = true;
} else {
return; // Safe to return and tick immediately because readConfigToEnd will do the backoff for us
}
}
log.debug("Ensuring group membership is still active");
String stageDescription = "ensuring membership in the cluster";
member.ensureActive(() -> new TickThreadStage(stageDescription));
completeTickThreadStage();
// Ensure we're in a good state in our group. If not restart and everything should be setup to rejoin
if (!handleRebalanceCompleted()) return;
} catch (WakeupException e) {
// May be due to a request from another thread, or might be stopping. If the latter, we need to check the
// flag immediately. If the former, we need to re-run the ensureActive call since we can't handle requests
// unless we're in the group.
log.trace("Woken up while ensure group membership is still active");
return;
}
if (fencedFromConfigTopic) {
if (isLeader()) {
// We were accidentally fenced out, possibly by a zombie leader
try {
log.debug("Reclaiming write privileges for config topic after being fenced out");
try (TickThreadStage stage = new TickThreadStage("reclaiming write privileges for the config topic")) {
configBackingStore.claimWritePrivileges();
}
fencedFromConfigTopic = false;
log.debug("Successfully reclaimed write privileges for config topic after being fenced out");
} catch (Exception e) {
log.warn("Unable to claim write privileges for config topic. Will backoff and possibly retry if still the leader", e);
backoff(CONFIG_TOPIC_WRITE_PRIVILEGES_BACKOFF_MS);
return;
}
} else {
log.trace("Relinquished write privileges for config topic after being fenced out, since worker is no longer the leader of the cluster");
// We were meant to be fenced out because we fell out of the group and a new leader was elected
fencedFromConfigTopic = false;
}
}
long now = time.milliseconds();
if (checkForKeyRotation(now)) {
log.debug("Distributing new session key");
keyExpiration = Long.MAX_VALUE;
try {
SessionKey newSessionKey = new SessionKey(keyGenerator.generateKey(), now);
writeToConfigTopicAsLeader(
"writing a new session key to the config topic",
() -> configBackingStore.putSessionKey(newSessionKey)
);
} catch (Exception e) {
log.info("Failed to write new session key to config topic; forcing a read to the end of the config topic before possibly retrying", e);
canReadConfigs = false;
return;
}
}
// Process any external requests
// TODO: Some of these can be performed concurrently or even optimized away entirely.
// For example, if three different connectors are slated to be restarted, it's fine to
// restart all three at the same time instead.
// Another example: if multiple configurations are submitted for the same connector,
// the only one that actually has to be written to the config topic is the
// most-recently one.
Long scheduledTick = null;
while (true) {
final DistributedHerderRequest next = peekWithoutException();
if (next == null) {
break;
} else if (now >= next.at) {
currentRequest = requests.pollFirst();
} else {
scheduledTick = next.at;
break;
}
runRequest(next.action(), next.callback());
}
// Process all pending connector restart requests
processRestartRequests();
if (scheduledRebalance < Long.MAX_VALUE) {
scheduledTick = scheduledTick != null ? Math.min(scheduledTick, scheduledRebalance) : scheduledRebalance;
rebalanceResolved = false;
log.debug("Scheduled rebalance at: {} (now: {} scheduledTick: {}) ",
scheduledRebalance, now, scheduledTick);
}
if (isLeader() && internalRequestValidationEnabled() && keyExpiration < Long.MAX_VALUE) {
scheduledTick = scheduledTick != null ? Math.min(scheduledTick, keyExpiration) : keyExpiration;
log.debug("Scheduled next key rotation at: {} (now: {} scheduledTick: {}) ",
keyExpiration, now, scheduledTick);
}
// Process any configuration updates
AtomicReference<Set<String>> connectorConfigUpdatesCopy = new AtomicReference<>();
AtomicReference<Set<String>> connectorTargetStateChangesCopy = new AtomicReference<>();
AtomicReference<Set<ConnectorTaskId>> taskConfigUpdatesCopy = new AtomicReference<>();
boolean shouldReturn;
if (member.currentProtocolVersion() == CONNECT_PROTOCOL_V0) {
shouldReturn = updateConfigsWithEager(connectorConfigUpdatesCopy,
connectorTargetStateChangesCopy);
// With eager protocol we should return immediately if needsReconfigRebalance has
// been set to retain the old workflow
if (shouldReturn) {
return;
}
if (connectorConfigUpdatesCopy.get() != null) {
processConnectorConfigUpdates(connectorConfigUpdatesCopy.get());
}
if (connectorTargetStateChangesCopy.get() != null) {
processTargetStateChanges(connectorTargetStateChangesCopy.get());
}
} else {
shouldReturn = updateConfigsWithIncrementalCooperative(connectorConfigUpdatesCopy,
connectorTargetStateChangesCopy, taskConfigUpdatesCopy);
if (connectorConfigUpdatesCopy.get() != null) {
processConnectorConfigUpdates(connectorConfigUpdatesCopy.get());
}
if (connectorTargetStateChangesCopy.get() != null) {
processTargetStateChanges(connectorTargetStateChangesCopy.get());
}
if (taskConfigUpdatesCopy.get() != null) {
processTaskConfigUpdatesWithIncrementalCooperative(taskConfigUpdatesCopy.get());
}
if (shouldReturn) {
return;
}
}
// Let the group take any actions it needs to
try {
long nextRequestTimeoutMs = scheduledTick != null ? Math.max(scheduledTick - time.milliseconds(), 0L) : Long.MAX_VALUE;
log.trace("Polling for group activity; will wait for {}ms or until poll is interrupted by "
+ "either config backing store updates or a new external request",
nextRequestTimeoutMs);
String pollDurationDescription = scheduledTick != null ? "for up to " + nextRequestTimeoutMs + "ms or " : "";
String stageDescription = "polling the group coordinator " + pollDurationDescription + "until interrupted";
member.poll(nextRequestTimeoutMs, () -> new TickThreadStage(stageDescription));
completeTickThreadStage();
// Ensure we're in a good state in our group. If not restart and everything should be setup to rejoin
handleRebalanceCompleted();
} catch (WakeupException e) { // FIXME should not be WakeupException
log.trace("Woken up while polling for group activity");
// Ignore. Just indicates we need to check the exit flag, for requested actions, etc.
}
}