in connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignor.java [157:315]
protected Map<String, ByteBuffer> performTaskAssignment(String leaderId, long maxOffset,
Map<String, ExtendedWorkerState> memberConfigs,
WorkerCoordinator coordinator, short protocolVersion) {
log.debug("Performing task assignment during generation: {} with memberId: {}",
coordinator.generationId(), coordinator.memberId());
// Base set: The previous assignment of connectors-and-tasks is a standalone snapshot that
// can be used to calculate derived sets
log.debug("Previous assignments: {}", previousAssignment);
int lastCompletedGenerationId = coordinator.lastCompletedGenerationId();
if (previousGenerationId != lastCompletedGenerationId) {
log.debug("Clearing the view of previous assignments due to generation mismatch between "
+ "previous generation ID {} and last completed generation ID {}. This can "
+ "happen if the leader fails to sync the assignment within a rebalancing round. "
+ "The following view of previous assignments might be outdated and will be "
+ "ignored by the leader in the current computation of new assignments. "
+ "Possibly outdated previous assignments: {}",
previousGenerationId, lastCompletedGenerationId, previousAssignment);
this.previousAssignment = ConnectorsAndTasks.EMPTY;
}
ClusterConfigState snapshot = coordinator.configSnapshot();
Set<String> configuredConnectors = new TreeSet<>(snapshot.connectors());
Set<ConnectorTaskId> configuredTasks = configuredConnectors.stream()
.flatMap(c -> snapshot.tasks(c).stream())
.collect(Collectors.toSet());
// Base set: The set of configured connectors-and-tasks is a standalone snapshot that can
// be used to calculate derived sets
ConnectorsAndTasks configured = new ConnectorsAndTasks.Builder()
.with(configuredConnectors, configuredTasks).build();
log.debug("Configured assignments: {}", configured);
// Base set: The set of active connectors-and-tasks is a standalone snapshot that can be
// used to calculate derived sets
ConnectorsAndTasks activeAssignments = assignment(memberConfigs);
log.debug("Active assignments: {}", activeAssignments);
// This means that a previous revocation did not take effect. In this case, reset
// appropriately and be ready to re-apply revocation of tasks
if (!previousRevocation.isEmpty()) {
if (previousRevocation.connectors().stream().anyMatch(c -> activeAssignments.connectors().contains(c))
|| previousRevocation.tasks().stream().anyMatch(t -> activeAssignments.tasks().contains(t))) {
previousAssignment = activeAssignments;
canRevoke = true;
}
previousRevocation.connectors().clear();
previousRevocation.tasks().clear();
}
// Derived set: The set of deleted connectors-and-tasks is a derived set from the set
// difference of previous - configured
ConnectorsAndTasks deleted = diff(previousAssignment, configured);
log.debug("Deleted assignments: {}", deleted);
// Derived set: The set of remaining active connectors-and-tasks is a derived set from the
// set difference of active - deleted
ConnectorsAndTasks remainingActive = diff(activeAssignments, deleted);
log.debug("Remaining (excluding deleted) active assignments: {}", remainingActive);
// Derived set: The set of lost or unaccounted connectors-and-tasks is a derived set from
// the set difference of previous - active - deleted
ConnectorsAndTasks lostAssignments = diff(previousAssignment, activeAssignments, deleted);
log.debug("Lost assignments: {}", lostAssignments);
// Derived set: The set of new connectors-and-tasks is a derived set from the set
// difference of configured - previous - active
ConnectorsAndTasks newSubmissions = diff(configured, previousAssignment, activeAssignments);
log.debug("New assignments: {}", newSubmissions);
// A collection of the complete assignment
List<WorkerLoad> completeWorkerAssignment = workerAssignment(memberConfigs, ConnectorsAndTasks.EMPTY);
log.debug("Complete (ignoring deletions) worker assignments: {}", completeWorkerAssignment);
// Per worker connector assignments without removing deleted connectors yet
Map<String, Collection<String>> connectorAssignments =
completeWorkerAssignment.stream().collect(Collectors.toMap(WorkerLoad::worker, WorkerLoad::connectors));
log.debug("Complete (ignoring deletions) connector assignments: {}", connectorAssignments);
// Per worker task assignments without removing deleted connectors yet
Map<String, Collection<ConnectorTaskId>> taskAssignments =
completeWorkerAssignment.stream().collect(Collectors.toMap(WorkerLoad::worker, WorkerLoad::tasks));
log.debug("Complete (ignoring deletions) task assignments: {}", taskAssignments);
// A collection of the current assignment excluding the connectors-and-tasks to be deleted
List<WorkerLoad> currentWorkerAssignment = workerAssignment(memberConfigs, deleted);
Map<String, ConnectorsAndTasks> toRevoke = computeDeleted(deleted, connectorAssignments, taskAssignments);
log.debug("Connector and task to delete assignments: {}", toRevoke);
// Revoking redundant connectors/tasks if the the workers have duplicate assignments
toRevoke.putAll(computeDuplicatedAssignments(memberConfigs, connectorAssignments, taskAssignments));
log.debug("Connector and task to revoke assignments (include duplicated assignments): {}", toRevoke);
// Recompute the complete assignment excluding the deleted connectors-and-tasks
completeWorkerAssignment = workerAssignment(memberConfigs, deleted);
connectorAssignments =
completeWorkerAssignment.stream().collect(Collectors.toMap(WorkerLoad::worker, WorkerLoad::connectors));
taskAssignments =
completeWorkerAssignment.stream().collect(Collectors.toMap(WorkerLoad::worker, WorkerLoad::tasks));
handleLostAssignments(lostAssignments, newSubmissions, completeWorkerAssignment, memberConfigs);
// Do not revoke resources for re-assignment while a delayed rebalance is active
// Also we do not revoke in two consecutive rebalances by the same leader
canRevoke = delay == 0 && canRevoke;
// Compute the connectors-and-tasks to be revoked for load balancing without taking into
// account the deleted ones.
log.debug("Can leader revoke tasks in this assignment? {} (delay: {})", canRevoke, delay);
if (canRevoke) {
Map<String, ConnectorsAndTasks> toExplicitlyRevoke =
performTaskRevocation(activeAssignments, currentWorkerAssignment);
log.debug("Connector and task to revoke assignments: {}", toRevoke);
toExplicitlyRevoke.forEach(
(worker, assignment) -> {
ConnectorsAndTasks existing = toRevoke.computeIfAbsent(
worker,
v -> new ConnectorsAndTasks.Builder().build());
existing.connectors().addAll(assignment.connectors());
existing.tasks().addAll(assignment.tasks());
}
);
canRevoke = toExplicitlyRevoke.size() == 0;
} else {
canRevoke = delay == 0;
}
assignConnectors(completeWorkerAssignment, newSubmissions.connectors());
assignTasks(completeWorkerAssignment, newSubmissions.tasks());
log.debug("Current complete assignments: {}", currentWorkerAssignment);
log.debug("New complete assignments: {}", completeWorkerAssignment);
Map<String, Collection<String>> currentConnectorAssignments =
currentWorkerAssignment.stream().collect(Collectors.toMap(WorkerLoad::worker, WorkerLoad::connectors));
Map<String, Collection<ConnectorTaskId>> currentTaskAssignments =
currentWorkerAssignment.stream().collect(Collectors.toMap(WorkerLoad::worker, WorkerLoad::tasks));
Map<String, Collection<String>> incrementalConnectorAssignments =
diff(connectorAssignments, currentConnectorAssignments);
Map<String, Collection<ConnectorTaskId>> incrementalTaskAssignments =
diff(taskAssignments, currentTaskAssignments);
log.debug("Incremental connector assignments: {}", incrementalConnectorAssignments);
log.debug("Incremental task assignments: {}", incrementalTaskAssignments);
coordinator.leaderState(new LeaderState(memberConfigs, connectorAssignments, taskAssignments));
Map<String, ExtendedAssignment> assignments =
fillAssignments(memberConfigs.keySet(), Assignment.NO_ERROR, leaderId,
memberConfigs.get(leaderId).url(), maxOffset, incrementalConnectorAssignments,
incrementalTaskAssignments, toRevoke, delay, protocolVersion);
previousAssignment = computePreviousAssignment(toRevoke, connectorAssignments, taskAssignments, lostAssignments);
previousGenerationId = coordinator.generationId();
previousMembers = memberConfigs.keySet();
log.debug("Actual assignments: {}", assignments);
return serializeAssignments(assignments);
}