in connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java [310:439]
public void tick() {
// The main loop does two primary things: 1) drive the group membership protocol, responding to rebalance events
// as they occur, and 2) handle external requests targeted at the leader. All the "real" work of the herder is
// performed in this thread, which keeps synchronization straightforward at the cost of some operations possibly
// blocking up this thread (especially those in callbacks due to rebalance events).
try {
// if we failed to read to end of log before, we need to make sure the issue was resolved before joining group
// Joining and immediately leaving for failure to read configs is exceedingly impolite
if (!canReadConfigs && !readConfigToEnd(workerSyncTimeoutMs))
return; // Safe to return and tick immediately because readConfigToEnd will do the backoff for us
log.debug("Ensuring group membership is still active");
member.ensureActive();
// Ensure we're in a good state in our group. If not restart and everything should be setup to rejoin
if (!handleRebalanceCompleted()) return;
} catch (WakeupException e) {
// May be due to a request from another thread, or might be stopping. If the latter, we need to check the
// flag immediately. If the former, we need to re-run the ensureActive call since we can't handle requests
// unless we're in the group.
log.trace("Woken up while ensure group membership is still active");
return;
}
long now = time.milliseconds();
if (checkForKeyRotation(now)) {
log.debug("Distributing new session key");
keyExpiration = Long.MAX_VALUE;
configBackingStore.putSessionKey(new SessionKey(
keyGenerator.generateKey(),
now
));
}
// Process any external requests
// TODO: Some of these can be performed concurrently or even optimized away entirely.
// For example, if three different connectors are slated to be restarted, it's fine to
// restart all three at the same time instead.
// Another example: if multiple configurations are submitted for the same connector,
// the only one that actually has to be written to the config topic is the
// most-recently one.
long nextRequestTimeoutMs = Long.MAX_VALUE;
while (true) {
final DistributedHerderRequest next = peekWithoutException();
if (next == null) {
break;
} else if (now >= next.at) {
requests.pollFirst();
} else {
nextRequestTimeoutMs = next.at - now;
break;
}
try {
next.action().call();
next.callback().onCompletion(null, null);
} catch (Throwable t) {
next.callback().onCompletion(t, null);
}
}
if (scheduledRebalance < Long.MAX_VALUE) {
nextRequestTimeoutMs = Math.min(nextRequestTimeoutMs, Math.max(scheduledRebalance - now, 0));
rebalanceResolved = false;
log.debug("Scheduled rebalance at: {} (now: {} nextRequestTimeoutMs: {}) ",
scheduledRebalance, now, nextRequestTimeoutMs);
}
if (internalRequestValidationEnabled() && keyExpiration < Long.MAX_VALUE) {
nextRequestTimeoutMs = Math.min(nextRequestTimeoutMs, Math.max(keyExpiration - now, 0));
log.debug("Scheduled next key rotation at: {} (now: {} nextRequestTimeoutMs: {}) ",
keyExpiration, now, nextRequestTimeoutMs);
}
// Process any configuration updates
AtomicReference<Set<String>> connectorConfigUpdatesCopy = new AtomicReference<>();
AtomicReference<Set<String>> connectorTargetStateChangesCopy = new AtomicReference<>();
AtomicReference<Set<ConnectorTaskId>> taskConfigUpdatesCopy = new AtomicReference<>();
boolean shouldReturn;
if (member.currentProtocolVersion() == CONNECT_PROTOCOL_V0) {
shouldReturn = updateConfigsWithEager(connectorConfigUpdatesCopy,
connectorTargetStateChangesCopy);
// With eager protocol we should return immediately if needsReconfigRebalance has
// been set to retain the old workflow
if (shouldReturn) {
return;
}
if (connectorConfigUpdatesCopy.get() != null) {
processConnectorConfigUpdates(connectorConfigUpdatesCopy.get());
}
if (connectorTargetStateChangesCopy.get() != null) {
processTargetStateChanges(connectorTargetStateChangesCopy.get());
}
} else {
shouldReturn = updateConfigsWithIncrementalCooperative(connectorConfigUpdatesCopy,
connectorTargetStateChangesCopy, taskConfigUpdatesCopy);
if (connectorConfigUpdatesCopy.get() != null) {
processConnectorConfigUpdates(connectorConfigUpdatesCopy.get());
}
if (connectorTargetStateChangesCopy.get() != null) {
processTargetStateChanges(connectorTargetStateChangesCopy.get());
}
if (taskConfigUpdatesCopy.get() != null) {
processTaskConfigUpdatesWithIncrementalCooperative(taskConfigUpdatesCopy.get());
}
if (shouldReturn) {
return;
}
}
// Let the group take any actions it needs to
try {
log.trace("Polling for group activity; will wait for {}ms or until poll is interrupted by "
+ "either config backing store updates or a new external request",
nextRequestTimeoutMs);
member.poll(nextRequestTimeoutMs);
// Ensure we're in a good state in our group. If not restart and everything should be setup to rejoin
handleRebalanceCompleted();
} catch (WakeupException e) { // FIXME should not be WakeupException
log.trace("Woken up while polling for group activity");
// Ignore. Just indicates we need to check the exit flag, for requested actions, etc.
}
}