protected Map performTaskAssignment()

in connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignor.java [157:315]


    protected Map<String, ByteBuffer> performTaskAssignment(String leaderId, long maxOffset,
                                                            Map<String, ExtendedWorkerState> memberConfigs,
                                                            WorkerCoordinator coordinator, short protocolVersion) {
        log.debug("Performing task assignment during generation: {} with memberId: {}",
                coordinator.generationId(), coordinator.memberId());

        // Base set: The previous assignment of connectors-and-tasks is a standalone snapshot that
        // can be used to calculate derived sets
        log.debug("Previous assignments: {}", previousAssignment);
        int lastCompletedGenerationId = coordinator.lastCompletedGenerationId();
        if (previousGenerationId != lastCompletedGenerationId) {
            log.debug("Clearing the view of previous assignments due to generation mismatch between "
                    + "previous generation ID {} and last completed generation ID {}. This can "
                    + "happen if the leader fails to sync the assignment within a rebalancing round. "
                    + "The following view of previous assignments might be outdated and will be "
                    + "ignored by the leader in the current computation of new assignments. "
                    + "Possibly outdated previous assignments: {}",
                    previousGenerationId, lastCompletedGenerationId, previousAssignment);
            this.previousAssignment = ConnectorsAndTasks.EMPTY;
        }

        ClusterConfigState snapshot = coordinator.configSnapshot();
        Set<String> configuredConnectors = new TreeSet<>(snapshot.connectors());
        Set<ConnectorTaskId> configuredTasks = configuredConnectors.stream()
                .flatMap(c -> snapshot.tasks(c).stream())
                .collect(Collectors.toSet());

        // Base set: The set of configured connectors-and-tasks is a standalone snapshot that can
        // be used to calculate derived sets
        ConnectorsAndTasks configured = new ConnectorsAndTasks.Builder()
                .with(configuredConnectors, configuredTasks).build();
        log.debug("Configured assignments: {}", configured);

        // Base set: The set of active connectors-and-tasks is a standalone snapshot that can be
        // used to calculate derived sets
        ConnectorsAndTasks activeAssignments = assignment(memberConfigs);
        log.debug("Active assignments: {}", activeAssignments);

        // This means that a previous revocation did not take effect. In this case, reset
        // appropriately and be ready to re-apply revocation of tasks
        if (!previousRevocation.isEmpty()) {
            if (previousRevocation.connectors().stream().anyMatch(c -> activeAssignments.connectors().contains(c))
                    || previousRevocation.tasks().stream().anyMatch(t -> activeAssignments.tasks().contains(t))) {
                previousAssignment = activeAssignments;
                canRevoke = true;
            }
            previousRevocation.connectors().clear();
            previousRevocation.tasks().clear();
        }

        // Derived set: The set of deleted connectors-and-tasks is a derived set from the set
        // difference of previous - configured
        ConnectorsAndTasks deleted = diff(previousAssignment, configured);
        log.debug("Deleted assignments: {}", deleted);

        // Derived set: The set of remaining active connectors-and-tasks is a derived set from the
        // set difference of active - deleted
        ConnectorsAndTasks remainingActive = diff(activeAssignments, deleted);
        log.debug("Remaining (excluding deleted) active assignments: {}", remainingActive);

        // Derived set: The set of lost or unaccounted connectors-and-tasks is a derived set from
        // the set difference of previous - active - deleted
        ConnectorsAndTasks lostAssignments = diff(previousAssignment, activeAssignments, deleted);
        log.debug("Lost assignments: {}", lostAssignments);

        // Derived set: The set of new connectors-and-tasks is a derived set from the set
        // difference of configured - previous - active
        ConnectorsAndTasks newSubmissions = diff(configured, previousAssignment, activeAssignments);
        log.debug("New assignments: {}", newSubmissions);

        // A collection of the complete assignment
        List<WorkerLoad> completeWorkerAssignment = workerAssignment(memberConfigs, ConnectorsAndTasks.EMPTY);
        log.debug("Complete (ignoring deletions) worker assignments: {}", completeWorkerAssignment);

        // Per worker connector assignments without removing deleted connectors yet
        Map<String, Collection<String>> connectorAssignments =
                completeWorkerAssignment.stream().collect(Collectors.toMap(WorkerLoad::worker, WorkerLoad::connectors));
        log.debug("Complete (ignoring deletions) connector assignments: {}", connectorAssignments);

        // Per worker task assignments without removing deleted connectors yet
        Map<String, Collection<ConnectorTaskId>> taskAssignments =
                completeWorkerAssignment.stream().collect(Collectors.toMap(WorkerLoad::worker, WorkerLoad::tasks));
        log.debug("Complete (ignoring deletions) task assignments: {}", taskAssignments);

        // A collection of the current assignment excluding the connectors-and-tasks to be deleted
        List<WorkerLoad> currentWorkerAssignment = workerAssignment(memberConfigs, deleted);

        Map<String, ConnectorsAndTasks> toRevoke = computeDeleted(deleted, connectorAssignments, taskAssignments);
        log.debug("Connector and task to delete assignments: {}", toRevoke);

        // Revoking redundant connectors/tasks if the the workers have duplicate assignments
        toRevoke.putAll(computeDuplicatedAssignments(memberConfigs, connectorAssignments, taskAssignments));
        log.debug("Connector and task to revoke assignments (include duplicated assignments): {}", toRevoke);

        // Recompute the complete assignment excluding the deleted connectors-and-tasks
        completeWorkerAssignment = workerAssignment(memberConfigs, deleted);
        connectorAssignments =
                completeWorkerAssignment.stream().collect(Collectors.toMap(WorkerLoad::worker, WorkerLoad::connectors));
        taskAssignments =
                completeWorkerAssignment.stream().collect(Collectors.toMap(WorkerLoad::worker, WorkerLoad::tasks));

        handleLostAssignments(lostAssignments, newSubmissions, completeWorkerAssignment, memberConfigs);

        // Do not revoke resources for re-assignment while a delayed rebalance is active
        // Also we do not revoke in two consecutive rebalances by the same leader
        canRevoke = delay == 0 && canRevoke;

        // Compute the connectors-and-tasks to be revoked for load balancing without taking into
        // account the deleted ones.
        log.debug("Can leader revoke tasks in this assignment? {} (delay: {})", canRevoke, delay);
        if (canRevoke) {
            Map<String, ConnectorsAndTasks> toExplicitlyRevoke =
                    performTaskRevocation(activeAssignments, currentWorkerAssignment);

            log.debug("Connector and task to revoke assignments: {}", toRevoke);

            toExplicitlyRevoke.forEach(
                (worker, assignment) -> {
                    ConnectorsAndTasks existing = toRevoke.computeIfAbsent(
                        worker,
                        v -> new ConnectorsAndTasks.Builder().build());
                    existing.connectors().addAll(assignment.connectors());
                    existing.tasks().addAll(assignment.tasks());
                }
            );
            canRevoke = toExplicitlyRevoke.size() == 0;
        } else {
            canRevoke = delay == 0;
        }

        assignConnectors(completeWorkerAssignment, newSubmissions.connectors());
        assignTasks(completeWorkerAssignment, newSubmissions.tasks());
        log.debug("Current complete assignments: {}", currentWorkerAssignment);
        log.debug("New complete assignments: {}", completeWorkerAssignment);

        Map<String, Collection<String>> currentConnectorAssignments =
                currentWorkerAssignment.stream().collect(Collectors.toMap(WorkerLoad::worker, WorkerLoad::connectors));
        Map<String, Collection<ConnectorTaskId>> currentTaskAssignments =
                currentWorkerAssignment.stream().collect(Collectors.toMap(WorkerLoad::worker, WorkerLoad::tasks));
        Map<String, Collection<String>> incrementalConnectorAssignments =
                diff(connectorAssignments, currentConnectorAssignments);
        Map<String, Collection<ConnectorTaskId>> incrementalTaskAssignments =
                diff(taskAssignments, currentTaskAssignments);

        log.debug("Incremental connector assignments: {}", incrementalConnectorAssignments);
        log.debug("Incremental task assignments: {}", incrementalTaskAssignments);

        coordinator.leaderState(new LeaderState(memberConfigs, connectorAssignments, taskAssignments));

        Map<String, ExtendedAssignment> assignments =
                fillAssignments(memberConfigs.keySet(), Assignment.NO_ERROR, leaderId,
                                memberConfigs.get(leaderId).url(), maxOffset, incrementalConnectorAssignments,
                                incrementalTaskAssignments, toRevoke, delay, protocolVersion);
        previousAssignment = computePreviousAssignment(toRevoke, connectorAssignments, taskAssignments, lostAssignments);
        previousGenerationId = coordinator.generationId();
        previousMembers = memberConfigs.keySet();
        log.debug("Actual assignments: {}", assignments);
        return serializeAssignments(assignments);
    }