ClusterAssignment performTaskAssignment()

in connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignor.java [199:388]


    ClusterAssignment performTaskAssignment(
            ClusterConfigState configSnapshot,
            int lastCompletedGenerationId,
            int currentGenerationId,
            Map<String, ConnectorsAndTasks> memberAssignments
    ) {
        // Base set: The previous assignment of connectors-and-tasks is a standalone snapshot that
        // can be used to calculate derived sets
        log.debug("Previous assignments: {}", previousAssignment);
        if (previousGenerationId != lastCompletedGenerationId) {
            log.debug("Clearing the view of previous assignments due to generation mismatch between "
                    + "previous generation ID {} and last completed generation ID {}. This can "
                    + "happen if the leader fails to sync the assignment within a rebalancing round. "
                    + "The following view of previous assignments might be outdated and will be "
                    + "ignored by the leader in the current computation of new assignments. "
                    + "Possibly outdated previous assignments: {}",
                    previousGenerationId, lastCompletedGenerationId, previousAssignment);
            this.previousAssignment = ConnectorsAndTasks.EMPTY;
        }

        Set<String> configuredConnectors = new TreeSet<>(configSnapshot.connectors());
        Set<ConnectorTaskId> configuredTasks = combineCollections(configuredConnectors, configSnapshot::tasks, Collectors.toSet());

        // Base set: The set of configured connectors-and-tasks is a standalone snapshot that can
        // be used to calculate derived sets
        ConnectorsAndTasks configured = new ConnectorsAndTasks.Builder()
                .with(configuredConnectors, configuredTasks).build();
        log.debug("Configured assignments: {}", configured);

        // Base set: The set of active connectors-and-tasks is a standalone snapshot that can be
        // used to calculate derived sets
        ConnectorsAndTasks activeAssignments = assignment(memberAssignments);
        log.debug("Active assignments: {}", activeAssignments);

        // This means that a previous revocation did not take effect. In this case, reset
        // appropriately and be ready to re-apply revocation of tasks
        if (!previousRevocation.isEmpty()) {
            if (previousRevocation.connectors().stream().anyMatch(c -> activeAssignments.connectors().contains(c))
                    || previousRevocation.tasks().stream().anyMatch(t -> activeAssignments.tasks().contains(t))) {
                previousAssignment = activeAssignments;
            }
            previousRevocation.connectors().clear();
            previousRevocation.tasks().clear();
        }

        // Derived set: The set of deleted connectors-and-tasks is a derived set from the set
        // difference of previous - configured
        ConnectorsAndTasks deleted = diff(previousAssignment, configured);
        log.debug("Deleted assignments: {}", deleted);

        // The connectors and tasks that are currently running on more than one worker each
        ConnectorsAndTasks duplicated = duplicatedAssignments(memberAssignments);
        log.trace("Duplicated assignments: {}", duplicated);

        // Derived set: The set of lost or unaccounted connectors-and-tasks is a derived set from
        // the set difference of previous - active - deleted
        ConnectorsAndTasks lostAssignments = diff(previousAssignment, activeAssignments, deleted);
        log.debug("Lost assignments: {}", lostAssignments);

        // Derived set: The set of new connectors-and-tasks is a derived set from the set
        // difference of configured - previous - active
        ConnectorsAndTasks created = diff(configured, previousAssignment, activeAssignments);
        log.debug("Created: {}", created);

        // A collection of the current assignment excluding the connectors-and-tasks to be deleted
        List<WorkerLoad> currentWorkerAssignment = workerAssignment(memberAssignments, deleted);

        Map<String, ConnectorsAndTasks.Builder> toRevoke = new HashMap<>();

        Map<String, ConnectorsAndTasks> deletedToRevoke = intersection(deleted, memberAssignments);
        log.debug("Deleted connectors and tasks to revoke from each worker: {}", deletedToRevoke);
        addAll(toRevoke, deletedToRevoke);

        // Revoking redundant connectors/tasks if the workers have duplicate assignments
        Map<String, ConnectorsAndTasks> duplicatedToRevoke = intersection(duplicated, memberAssignments);
        log.debug("Duplicated connectors and tasks to revoke from each worker: {}", duplicatedToRevoke);
        addAll(toRevoke, duplicatedToRevoke);

        // Compute the assignment that will be applied across the cluster after this round of rebalance
        // Later on, new submissions and lost-and-reassigned connectors and tasks will be added to these assignments,
        // and load-balancing revocations will be removed from them.
        List<WorkerLoad> nextWorkerAssignment = workerLoads(memberAssignments);
        removeAll(nextWorkerAssignment, deletedToRevoke);
        removeAll(nextWorkerAssignment, duplicatedToRevoke);

        // Collect the lost assignments that are ready to be reassigned because the workers that were
        // originally responsible for them appear to have left the cluster instead of rejoining within
        // the scheduled rebalance delay. These assignments will be re-allocated to the existing workers
        // in the cluster later on
        ConnectorsAndTasks.Builder lostAssignmentsToReassignBuilder = new ConnectorsAndTasks.Builder();
        handleLostAssignments(lostAssignments, lostAssignmentsToReassignBuilder, nextWorkerAssignment);
        ConnectorsAndTasks lostAssignmentsToReassign = lostAssignmentsToReassignBuilder.build();

        // Do not revoke resources for re-assignment while a delayed rebalance is active
        if (delay == 0) {
            Map<String, ConnectorsAndTasks> loadBalancingRevocations =
                    performLoadBalancingRevocations(configured, nextWorkerAssignment);

            // If this round and the previous round involved revocation, we will calculate a delay for
            // the next round when revoking rebalance would be allowed. Note that delay could be 0, in which
            // case we would always revoke.
            if (revokedInPrevious && !loadBalancingRevocations.isEmpty()) {
                numSuccessiveRevokingRebalances++; // Should we consider overflow for this?
                log.debug("Consecutive revoking rebalances observed. Computing delay and next scheduled rebalance.");
                delay = (int) consecutiveRevokingRebalancesBackoff.backoff(numSuccessiveRevokingRebalances);
                if (delay != 0) {
                    scheduledRebalance = time.milliseconds() + delay;
                    log.debug("Skipping revocations in the current round with a delay of {}ms. Next scheduled rebalance:{}",
                            delay, scheduledRebalance);
                } else {
                    log.debug("Revoking assignments immediately since scheduled.rebalance.max.delay.ms is set to 0");
                    addAll(toRevoke, loadBalancingRevocations);
                    // Remove all newly-revoked connectors and tasks from the next assignment, both to
                    // ensure that they are not included in the assignments during this round, and to produce
                    // an accurate allocation of all newly-created and lost-and-reassigned connectors and tasks
                    // that will have to be distributed across the cluster during this round
                    removeAll(nextWorkerAssignment, loadBalancingRevocations);
                }
            } else if (!loadBalancingRevocations.isEmpty()) {
                // We had a revocation in this round but not in the previous round. Let's store that state.
                log.debug("Performing allocation-balancing revocation immediately as no revocations took place during the previous rebalance");
                addAll(toRevoke, loadBalancingRevocations);
                removeAll(nextWorkerAssignment, loadBalancingRevocations);
                revokedInPrevious = true;
            } else if (revokedInPrevious) {
                // No revocations in this round but the previous round had one. Probably the workers
                // have converged to a balanced load. We can reset the rebalance clock
                log.debug("Previous round had revocations but this round didn't. Probably, the cluster has reached a " +
                        "balanced load. Resetting the exponential backoff clock");
                revokedInPrevious = false;
                numSuccessiveRevokingRebalances = 0;
            } else {
                // no-op
                log.debug("No revocations in previous and current round.");
            }
        } else {
            log.debug("Delayed rebalance is active. Delaying {}ms before revoking connectors and tasks: {}", delay, toRevoke);
            revokedInPrevious = false;
        }

        // The complete set of connectors and tasks that should be newly-assigned during this round
        ConnectorsAndTasks toAssign = new ConnectorsAndTasks.Builder()
                .addAll(created)
                .addAll(lostAssignmentsToReassign)
                .build();

        assignConnectors(nextWorkerAssignment, toAssign.connectors());
        assignTasks(nextWorkerAssignment, toAssign.tasks());

        Map<String, Collection<String>> nextConnectorAssignments = nextWorkerAssignment.stream()
                .collect(Collectors.toMap(
                        WorkerLoad::worker,
                        WorkerLoad::connectors
                ));
        Map<String, Collection<ConnectorTaskId>> nextTaskAssignments = nextWorkerAssignment.stream()
                .collect(Collectors.toMap(
                        WorkerLoad::worker,
                        WorkerLoad::tasks
                ));

        Map<String, Collection<String>> currentConnectorAssignments =
                currentWorkerAssignment.stream().collect(Collectors.toMap(WorkerLoad::worker, WorkerLoad::connectors));
        Map<String, Collection<ConnectorTaskId>> currentTaskAssignments =
                currentWorkerAssignment.stream().collect(Collectors.toMap(WorkerLoad::worker, WorkerLoad::tasks));
        Map<String, Collection<String>> incrementalConnectorAssignments =
                diff(nextConnectorAssignments, currentConnectorAssignments);
        Map<String, Collection<ConnectorTaskId>> incrementalTaskAssignments =
                diff(nextTaskAssignments, currentTaskAssignments);

        Map<String, ConnectorsAndTasks> revoked = buildAll(toRevoke);

        previousAssignment = computePreviousAssignment(revoked, nextConnectorAssignments, nextTaskAssignments, lostAssignments);
        previousGenerationId = currentGenerationId;
        previousMembers = memberAssignments.keySet();

        log.debug("Incremental connector assignments: {}", incrementalConnectorAssignments);
        log.debug("Incremental task assignments: {}", incrementalTaskAssignments);

        Map<String, Collection<String>> revokedConnectors = transformValues(revoked, ConnectorsAndTasks::connectors);
        Map<String, Collection<ConnectorTaskId>> revokedTasks = transformValues(revoked, ConnectorsAndTasks::tasks);

        return new ClusterAssignment(
                incrementalConnectorAssignments,
                incrementalTaskAssignments,
                revokedConnectors,
                revokedTasks,
                diff(nextConnectorAssignments, revokedConnectors),
                diff(nextTaskAssignments, revokedTasks)
        );
    }