private CompletableFuture handleChangePendingAssignmentEvent()

in modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java [2194:2324]


    private CompletableFuture<Void> handleChangePendingAssignmentEvent(
            long causalityToken,
            TablePartitionId replicaGrpId,
            TableImpl tbl,
            Entry pendingAssignmentsEntry,
            Entry stableAssignmentsEntry
    ) {
        // Assignments of the pending rebalance that we received through the Meta storage watch mechanism.
        Set<Assignment> pendingAssignments = ByteUtils.fromBytes(pendingAssignmentsEntry.value());

        PeersAndLearners pendingConfiguration = configurationFromAssignments(pendingAssignments);

        int tableId = tbl.tableId();
        int partId = replicaGrpId.partitionId();

        byte[] stableAssignmentsBytes = stableAssignmentsEntry.value();

        Set<Assignment> stableAssignments = ByteUtils.fromBytes(stableAssignmentsBytes);

        PeersAndLearners stableConfiguration = configurationFromAssignments(stableAssignments);

        placementDriver.updateAssignment(
                replicaGrpId,
                stableConfiguration.peers().stream().map(Peer::consistentId).collect(toList())
        );

        ClusterNode localMember = localNode();

        // Start a new Raft node and Replica if this node has appeared in the new assignments.
        boolean shouldStartLocalServices = pendingAssignments.stream()
                .filter(assignment -> localMember.name().equals(assignment.consistentId()))
                .anyMatch(assignment -> !stableAssignments.contains(assignment));

        PendingComparableValuesTracker<HybridTimestamp, Void> safeTimeTracker =
                new PendingComparableValuesTracker<>(new HybridTimestamp(1, 0));
        PendingComparableValuesTracker<Long, Void> storageIndexTracker = new PendingComparableValuesTracker<>(0L);

        InternalTable internalTable = tbl.internalTable();

        LOG.info("Received update on pending assignments. Check if new raft group should be started"
                        + " [key={}, partition={}, table={}, localMemberAddress={}]",
                new String(pendingAssignmentsEntry.key(), StandardCharsets.UTF_8), partId, tbl.name(), localMember.address());

        CompletableFuture<Void> localServicesStartFuture;

        if (shouldStartLocalServices) {
            localServicesStartFuture = localPartsByTableIdVv.get(causalityToken).thenComposeAsync(oldMap -> {
                PartitionSet partitionSet = oldMap.get(tableId).copy();

                return getOrCreatePartitionStorages(tbl, partitionSet).thenApply(u -> {
                    var newMap = new HashMap<>(oldMap);

                    newMap.put(tableId, partitionSet);

                    return newMap;
                });
            }).thenComposeAsync(unused -> {
                PartitionStorages partitionStorages = getPartitionStorages(tbl, partId);

                MvPartitionStorage mvPartitionStorage = partitionStorages.getMvPartitionStorage();
                TxStateStorage txStatePartitionStorage = partitionStorages.getTxStateStorage();

                PartitionDataStorage partitionDataStorage = partitionDataStorage(mvPartitionStorage, internalTable, partId);

                PartitionUpdateHandlers partitionUpdateHandlers = createPartitionUpdateHandlers(
                        partId,
                        partitionDataStorage,
                        tbl,
                        safeTimeTracker
                );

                return runAsync(() -> inBusyLock(busyLock, () -> {
                    try {
                        startPartitionRaftGroupNode(
                                replicaGrpId,
                                pendingConfiguration,
                                stableConfiguration,
                                safeTimeTracker,
                                storageIndexTracker,
                                internalTable,
                                txStatePartitionStorage,
                                partitionDataStorage,
                                partitionUpdateHandlers
                        );

                        startReplicaWithNewListener(
                                replicaGrpId,
                                tbl,
                                safeTimeTracker,
                                storageIndexTracker,
                                mvPartitionStorage,
                                txStatePartitionStorage,
                                partitionUpdateHandlers,
                                (TopologyAwareRaftGroupService) internalTable.partitionRaftGroupService(partId),
                                completedFuture(schemaManager.schemaRegistry(tableId))
                        );
                    } catch (NodeStoppingException ignored) {
                        // No-op.
                    }
                }), ioExecutor);
            });
        } else {
            localServicesStartFuture = completedFuture(null);
        }

        return localServicesStartFuture
                .thenCompose(v -> metaStorageMgr.get(pendingPartAssignmentsKey(replicaGrpId)))
                .thenCompose(latestPendingAssignmentsEntry -> {
                    // Do not change peers of the raft group if this is a stale event.
                    // Note that we start raft node before for the sake of the consistency in a starting and stopping raft nodes.
                    if (pendingAssignmentsEntry.revision() < latestPendingAssignmentsEntry.revision()) {
                        return completedFuture(null);
                    }

                    RaftGroupService partGrpSvc = internalTable.partitionRaftGroupService(partId);

                    return partGrpSvc.refreshAndGetLeaderWithTerm()
                            .thenCompose(leaderWithTerm -> {
                                // run update of raft configuration if this node is a leader
                                if (isLocalPeer(leaderWithTerm.leader())) {
                                    LOG.info("Current node={} is the leader of partition raft group={}. "
                                                    + "Initiate rebalance process for partition={}, table={}",
                                            localMember.address(), replicaGrpId, partId, tbl.name());

                                    return partGrpSvc.changePeersAsync(pendingConfiguration, leaderWithTerm.term());
                                } else {
                                    return completedFuture(null);
                                }
                            });
                });
    }