in connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignor.java [199:388]
ClusterAssignment performTaskAssignment(
ClusterConfigState configSnapshot,
int lastCompletedGenerationId,
int currentGenerationId,
Map<String, ConnectorsAndTasks> memberAssignments
) {
// Base set: The previous assignment of connectors-and-tasks is a standalone snapshot that
// can be used to calculate derived sets
log.debug("Previous assignments: {}", previousAssignment);
if (previousGenerationId != lastCompletedGenerationId) {
log.debug("Clearing the view of previous assignments due to generation mismatch between "
+ "previous generation ID {} and last completed generation ID {}. This can "
+ "happen if the leader fails to sync the assignment within a rebalancing round. "
+ "The following view of previous assignments might be outdated and will be "
+ "ignored by the leader in the current computation of new assignments. "
+ "Possibly outdated previous assignments: {}",
previousGenerationId, lastCompletedGenerationId, previousAssignment);
this.previousAssignment = ConnectorsAndTasks.EMPTY;
}
Set<String> configuredConnectors = new TreeSet<>(configSnapshot.connectors());
Set<ConnectorTaskId> configuredTasks = combineCollections(configuredConnectors, configSnapshot::tasks, Collectors.toSet());
// Base set: The set of configured connectors-and-tasks is a standalone snapshot that can
// be used to calculate derived sets
ConnectorsAndTasks configured = new ConnectorsAndTasks.Builder()
.with(configuredConnectors, configuredTasks).build();
log.debug("Configured assignments: {}", configured);
// Base set: The set of active connectors-and-tasks is a standalone snapshot that can be
// used to calculate derived sets
ConnectorsAndTasks activeAssignments = assignment(memberAssignments);
log.debug("Active assignments: {}", activeAssignments);
// This means that a previous revocation did not take effect. In this case, reset
// appropriately and be ready to re-apply revocation of tasks
if (!previousRevocation.isEmpty()) {
if (previousRevocation.connectors().stream().anyMatch(c -> activeAssignments.connectors().contains(c))
|| previousRevocation.tasks().stream().anyMatch(t -> activeAssignments.tasks().contains(t))) {
previousAssignment = activeAssignments;
}
previousRevocation.connectors().clear();
previousRevocation.tasks().clear();
}
// Derived set: The set of deleted connectors-and-tasks is a derived set from the set
// difference of previous - configured
ConnectorsAndTasks deleted = diff(previousAssignment, configured);
log.debug("Deleted assignments: {}", deleted);
// The connectors and tasks that are currently running on more than one worker each
ConnectorsAndTasks duplicated = duplicatedAssignments(memberAssignments);
log.trace("Duplicated assignments: {}", duplicated);
// Derived set: The set of lost or unaccounted connectors-and-tasks is a derived set from
// the set difference of previous - active - deleted
ConnectorsAndTasks lostAssignments = diff(previousAssignment, activeAssignments, deleted);
log.debug("Lost assignments: {}", lostAssignments);
// Derived set: The set of new connectors-and-tasks is a derived set from the set
// difference of configured - previous - active
ConnectorsAndTasks created = diff(configured, previousAssignment, activeAssignments);
log.debug("Created: {}", created);
// A collection of the current assignment excluding the connectors-and-tasks to be deleted
List<WorkerLoad> currentWorkerAssignment = workerAssignment(memberAssignments, deleted);
Map<String, ConnectorsAndTasks.Builder> toRevoke = new HashMap<>();
Map<String, ConnectorsAndTasks> deletedToRevoke = intersection(deleted, memberAssignments);
log.debug("Deleted connectors and tasks to revoke from each worker: {}", deletedToRevoke);
addAll(toRevoke, deletedToRevoke);
// Revoking redundant connectors/tasks if the workers have duplicate assignments
Map<String, ConnectorsAndTasks> duplicatedToRevoke = intersection(duplicated, memberAssignments);
log.debug("Duplicated connectors and tasks to revoke from each worker: {}", duplicatedToRevoke);
addAll(toRevoke, duplicatedToRevoke);
// Compute the assignment that will be applied across the cluster after this round of rebalance
// Later on, new submissions and lost-and-reassigned connectors and tasks will be added to these assignments,
// and load-balancing revocations will be removed from them.
List<WorkerLoad> nextWorkerAssignment = workerLoads(memberAssignments);
removeAll(nextWorkerAssignment, deletedToRevoke);
removeAll(nextWorkerAssignment, duplicatedToRevoke);
// Collect the lost assignments that are ready to be reassigned because the workers that were
// originally responsible for them appear to have left the cluster instead of rejoining within
// the scheduled rebalance delay. These assignments will be re-allocated to the existing workers
// in the cluster later on
ConnectorsAndTasks.Builder lostAssignmentsToReassignBuilder = new ConnectorsAndTasks.Builder();
handleLostAssignments(lostAssignments, lostAssignmentsToReassignBuilder, nextWorkerAssignment);
ConnectorsAndTasks lostAssignmentsToReassign = lostAssignmentsToReassignBuilder.build();
// Do not revoke resources for re-assignment while a delayed rebalance is active
if (delay == 0) {
Map<String, ConnectorsAndTasks> loadBalancingRevocations =
performLoadBalancingRevocations(configured, nextWorkerAssignment);
// If this round and the previous round involved revocation, we will calculate a delay for
// the next round when revoking rebalance would be allowed. Note that delay could be 0, in which
// case we would always revoke.
if (revokedInPrevious && !loadBalancingRevocations.isEmpty()) {
numSuccessiveRevokingRebalances++; // Should we consider overflow for this?
log.debug("Consecutive revoking rebalances observed. Computing delay and next scheduled rebalance.");
delay = (int) consecutiveRevokingRebalancesBackoff.backoff(numSuccessiveRevokingRebalances);
if (delay != 0) {
scheduledRebalance = time.milliseconds() + delay;
log.debug("Skipping revocations in the current round with a delay of {}ms. Next scheduled rebalance:{}",
delay, scheduledRebalance);
} else {
log.debug("Revoking assignments immediately since scheduled.rebalance.max.delay.ms is set to 0");
addAll(toRevoke, loadBalancingRevocations);
// Remove all newly-revoked connectors and tasks from the next assignment, both to
// ensure that they are not included in the assignments during this round, and to produce
// an accurate allocation of all newly-created and lost-and-reassigned connectors and tasks
// that will have to be distributed across the cluster during this round
removeAll(nextWorkerAssignment, loadBalancingRevocations);
}
} else if (!loadBalancingRevocations.isEmpty()) {
// We had a revocation in this round but not in the previous round. Let's store that state.
log.debug("Performing allocation-balancing revocation immediately as no revocations took place during the previous rebalance");
addAll(toRevoke, loadBalancingRevocations);
removeAll(nextWorkerAssignment, loadBalancingRevocations);
revokedInPrevious = true;
} else if (revokedInPrevious) {
// No revocations in this round but the previous round had one. Probably the workers
// have converged to a balanced load. We can reset the rebalance clock
log.debug("Previous round had revocations but this round didn't. Probably, the cluster has reached a " +
"balanced load. Resetting the exponential backoff clock");
revokedInPrevious = false;
numSuccessiveRevokingRebalances = 0;
} else {
// no-op
log.debug("No revocations in previous and current round.");
}
} else {
log.debug("Delayed rebalance is active. Delaying {}ms before revoking connectors and tasks: {}", delay, toRevoke);
revokedInPrevious = false;
}
// The complete set of connectors and tasks that should be newly-assigned during this round
ConnectorsAndTasks toAssign = new ConnectorsAndTasks.Builder()
.addAll(created)
.addAll(lostAssignmentsToReassign)
.build();
assignConnectors(nextWorkerAssignment, toAssign.connectors());
assignTasks(nextWorkerAssignment, toAssign.tasks());
Map<String, Collection<String>> nextConnectorAssignments = nextWorkerAssignment.stream()
.collect(Collectors.toMap(
WorkerLoad::worker,
WorkerLoad::connectors
));
Map<String, Collection<ConnectorTaskId>> nextTaskAssignments = nextWorkerAssignment.stream()
.collect(Collectors.toMap(
WorkerLoad::worker,
WorkerLoad::tasks
));
Map<String, Collection<String>> currentConnectorAssignments =
currentWorkerAssignment.stream().collect(Collectors.toMap(WorkerLoad::worker, WorkerLoad::connectors));
Map<String, Collection<ConnectorTaskId>> currentTaskAssignments =
currentWorkerAssignment.stream().collect(Collectors.toMap(WorkerLoad::worker, WorkerLoad::tasks));
Map<String, Collection<String>> incrementalConnectorAssignments =
diff(nextConnectorAssignments, currentConnectorAssignments);
Map<String, Collection<ConnectorTaskId>> incrementalTaskAssignments =
diff(nextTaskAssignments, currentTaskAssignments);
Map<String, ConnectorsAndTasks> revoked = buildAll(toRevoke);
previousAssignment = computePreviousAssignment(revoked, nextConnectorAssignments, nextTaskAssignments, lostAssignments);
previousGenerationId = currentGenerationId;
previousMembers = memberAssignments.keySet();
log.debug("Incremental connector assignments: {}", incrementalConnectorAssignments);
log.debug("Incremental task assignments: {}", incrementalTaskAssignments);
Map<String, Collection<String>> revokedConnectors = transformValues(revoked, ConnectorsAndTasks::connectors);
Map<String, Collection<ConnectorTaskId>> revokedTasks = transformValues(revoked, ConnectorsAndTasks::tasks);
return new ClusterAssignment(
incrementalConnectorAssignments,
incrementalTaskAssignments,
revokedConnectors,
revokedTasks,
diff(nextConnectorAssignments, revokedConnectors),
diff(nextTaskAssignments, revokedTasks)
);
}