private CompletableFuture createTablePartitionsLocally()

in modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java [703:936]


    private CompletableFuture<?> createTablePartitionsLocally(
            long causalityToken,
            CompletableFuture<List<Set<Assignment>>> assignmentsFuture,
            int zoneId,
            TableImpl table
    ) {
        int tableId = table.tableId();

        // Create new raft nodes according to new assignments.
        Supplier<CompletableFuture<Void>> updateAssignmentsClosure = () -> assignmentsFuture.thenCompose(newAssignments -> {
            // Empty assignments might be a valid case if tables are created from within cluster init HOCON
            // configuration, which is not supported now.
            assert newAssignments != null : IgniteStringFormatter.format("Table [id={}] has empty assignments.", tableId);

            int partitions = newAssignments.size();

            CompletableFuture<?>[] futures = new CompletableFuture<?>[partitions];

            // TODO: https://issues.apache.org/jira/browse/IGNITE-19713 Process assignments and set partitions only for assigned partitions.
            PartitionSet parts = new BitSetPartitionSet();

            for (int i = 0; i < futures.length; i++) {
                futures[i] = new CompletableFuture<>();

                parts.set(i);
            }

            String localMemberName = localNode().name();

            for (int i = 0; i < partitions; i++) {
                int partId = i;

                Set<Assignment> newPartAssignment = newAssignments.get(partId);

                InternalTable internalTbl = table.internalTable();

                Assignment localMemberAssignment = newPartAssignment.stream()
                        .filter(a -> a.consistentId().equals(localMemberName))
                        .findAny()
                        .orElse(null);

                PeersAndLearners newConfiguration = configurationFromAssignments(newPartAssignment);

                TablePartitionId replicaGrpId = new TablePartitionId(tableId, partId);

                placementDriver.updateAssignment(replicaGrpId, newConfiguration.peers().stream().map(Peer::consistentId)
                        .collect(toList()));

                var safeTimeTracker = new PendingComparableValuesTracker<HybridTimestamp, Void>(
                        new HybridTimestamp(1, 0)
                );
                var storageIndexTracker = new PendingComparableValuesTracker<Long, Void>(0L);

                ((InternalTableImpl) internalTbl).updatePartitionTrackers(partId, safeTimeTracker, storageIndexTracker);

                PartitionStorages partitionStorages = getPartitionStorages(table, partId);

                PartitionDataStorage partitionDataStorage = partitionDataStorage(partitionStorages.getMvPartitionStorage(),
                        internalTbl, partId);

                PartitionUpdateHandlers partitionUpdateHandlers = createPartitionUpdateHandlers(
                        partId,
                        partitionDataStorage,
                        table,
                        safeTimeTracker
                );

                mvGc.addStorage(replicaGrpId, partitionUpdateHandlers.gcUpdateHandler);

                CompletableFuture<Boolean> startGroupFut;

                // start new nodes, only if it is table creation, other cases will be covered by rebalance logic
                if (localMemberAssignment != null) {
                    CompletableFuture<Boolean> shouldStartGroupFut;

                    // If Raft is running in in-memory mode or the PDS has been cleared, we need to remove the current node
                    // from the Raft group in order to avoid the double vote problem.
                    // <MUTED> See https://issues.apache.org/jira/browse/IGNITE-16668 for details.
                    // TODO: https://issues.apache.org/jira/browse/IGNITE-19046 Restore "|| !hasData"
                    if (internalTbl.storage().isVolatile()) {
                        shouldStartGroupFut = queryDataNodesCount(tableId, partId, newConfiguration.peers())
                                .thenApply(dataNodesCount -> {
                                    boolean fullPartitionRestart = dataNodesCount == 0;

                                    if (fullPartitionRestart) {
                                        return true;
                                    }

                                    boolean majorityAvailable = dataNodesCount >= (newConfiguration.peers().size() / 2) + 1;

                                    if (majorityAvailable) {
                                        RebalanceUtil.startPeerRemoval(replicaGrpId, localMemberAssignment, metaStorageMgr);

                                        return false;
                                    } else {
                                        // No majority and not a full partition restart - need to restart nodes
                                        // with current partition.
                                        String msg = "Unable to start partition " + partId + ". Majority not available.";

                                        throw new IgniteInternalException(msg);
                                    }
                                });
                    } else {
                        shouldStartGroupFut = completedFuture(true);
                    }

                    startGroupFut = shouldStartGroupFut.thenApplyAsync(startGroup -> inBusyLock(busyLock, () -> {
                        if (!startGroup) {
                            return false;
                        }
                        TxStateStorage txStatePartitionStorage = partitionStorages.getTxStateStorage();

                        RaftGroupOptions groupOptions = groupOptionsForPartition(
                                internalTbl.storage(),
                                internalTbl.txStateStorage(),
                                partitionKey(internalTbl, partId),
                                partitionUpdateHandlers
                        );

                        Peer serverPeer = newConfiguration.peer(localMemberName);

                        var raftNodeId = new RaftNodeId(replicaGrpId, serverPeer);

                        try {
                            // TODO: use RaftManager interface, see https://issues.apache.org/jira/browse/IGNITE-18273
                            ((Loza) raftMgr).startRaftGroupNode(
                                    raftNodeId,
                                    newConfiguration,
                                    new PartitionListener(
                                            partitionDataStorage,
                                            partitionUpdateHandlers.storageUpdateHandler,
                                            txStatePartitionStorage,
                                            safeTimeTracker,
                                            storageIndexTracker
                                    ),
                                    new RebalanceRaftGroupEventsListener(
                                            metaStorageMgr,
                                            replicaGrpId,
                                            busyLock,
                                            createPartitionMover(internalTbl, partId),
                                            this::calculateAssignments,
                                            rebalanceScheduler
                                    ),
                                    groupOptions
                            );

                            return true;
                        } catch (NodeStoppingException ex) {
                            throw new CompletionException(ex);
                        }
                    }), ioExecutor);
                } else {
                    startGroupFut = completedFuture(false);
                }

                startGroupFut
                        .thenComposeAsync(v -> inBusyLock(busyLock, () -> {
                            try {
                                //TODO IGNITE-19614 This procedure takes 10 seconds if there's no majority online.
                                return raftMgr.startRaftGroupService(replicaGrpId, newConfiguration, raftGroupServiceFactory);
                            } catch (NodeStoppingException ex) {
                                return failedFuture(ex);
                            }
                        }), ioExecutor)
                        .thenAcceptAsync(updatedRaftGroupService -> inBusyLock(busyLock, () -> {
                            ((InternalTableImpl) internalTbl).updateInternalTableRaftGroupService(partId, updatedRaftGroupService);

                            boolean startedRaftNode = startGroupFut.join();
                            if (localMemberAssignment == null || !startedRaftNode) {
                                return;
                            }

                            MvPartitionStorage partitionStorage = partitionStorages.getMvPartitionStorage();
                            TxStateStorage txStateStorage = partitionStorages.getTxStateStorage();

                            try {
                                startReplicaWithNewListener(
                                        replicaGrpId,
                                        table,
                                        safeTimeTracker,
                                        storageIndexTracker,
                                        partitionStorage,
                                        txStateStorage,
                                        partitionUpdateHandlers,
                                        updatedRaftGroupService,
                                        schemaManager.schemaRegistry(causalityToken, tableId)
                                );
                            } catch (NodeStoppingException ex) {
                                throw new AssertionError("Loza was stopped before Table manager", ex);
                            }
                        }), ioExecutor)
                        .whenComplete((res, ex) -> {
                            if (ex != null) {
                                LOG.warn("Unable to update raft groups on the node [tableId={}, partitionId={}]", ex, tableId, partId);

                                futures[partId].completeExceptionally(ex);
                            } else {
                                futures[partId].complete(null);
                            }
                        });
            }

            return allOf(futures);
        });

        // NB: all vv.update() calls must be made from the synchronous part of the method (not in thenCompose()/etc!).
        CompletableFuture<?> localPartsUpdateFuture = localPartsByTableIdVv.update(causalityToken,
                (previous, throwable) -> inBusyLock(busyLock, () -> assignmentsFuture.thenCompose(newAssignments -> {
                    PartitionSet parts = new BitSetPartitionSet();

                    for (int i = 0; i < newAssignments.size(); i++) {
                        parts.set(i);
                    }

                    return getOrCreatePartitionStorages(table, parts).thenApply(u -> {
                        var newValue = new HashMap<>(previous);

                        newValue.put(tableId, parts);

                        return newValue;
                    });
                })));

        return assignmentsUpdatedVv.update(causalityToken, (token, e) -> {
            if (e != null) {
                return failedFuture(e);
            }

            return localPartsUpdateFuture.thenCompose(unused ->
                    tablesByIdVv.get(causalityToken)
                            .thenComposeAsync(tablesById -> inBusyLock(busyLock, updateAssignmentsClosure), ioExecutor)
            );
        });
    }