private Int2ObjectOpenHashMap startTable()

in modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxDistributedTestSingleNode.java [410:578]


    private Int2ObjectOpenHashMap<RaftGroupService> startTable(int tblId, SchemaDescriptor schemaDescriptor) throws Exception {
        List<Set<Assignment>> calculatedAssignments = AffinityUtils.calculateAssignments(
                cluster.stream().map(node -> node.topologyService().localMember().name()).collect(toList()),
                1,
                replicas()
        );

        List<Set<String>> assignments = calculatedAssignments.stream()
                .map(a -> a.stream().map(Assignment::consistentId).collect(toSet()))
                .collect(toList());

        List<TablePartitionId> grpIds = IntStream.range(0, assignments.size())
                .mapToObj(i -> new TablePartitionId(tblId, i))
                .collect(toList());

        Int2ObjectOpenHashMap<RaftGroupService> clients = new Int2ObjectOpenHashMap<>();

        List<CompletableFuture<Void>> partitionReadyFutures = new ArrayList<>();

        int globalIndexId = 1;

        for (int p = 0; p < assignments.size(); p++) {
            Set<String> partAssignments = assignments.get(p);

            TablePartitionId grpId = grpIds.get(p);

            for (String assignment : partAssignments) {
                var mvTableStorage = new TestMvTableStorage(tblId, DEFAULT_PARTITION_COUNT);
                var mvPartStorage = new TestMvPartitionStorage(0);
                var txStateStorage = txStateStorages.get(assignment);
                var placementDriver = new PlacementDriver(replicaServices.get(assignment), consistentIdToNode);

                for (int part = 0; part < assignments.size(); part++) {
                    placementDriver.updateAssignment(grpIds.get(part), assignments.get(part));
                }

                int partId = p;

                int indexId = globalIndexId++;

                Function<BinaryRow, BinaryTuple> row2Tuple = BinaryRowConverter.keyExtractor(schemaDescriptor);

                Lazy<TableSchemaAwareIndexStorage> pkStorage = new Lazy<>(() -> new TableSchemaAwareIndexStorage(
                        indexId,
                        new TestHashIndexStorage(partId, null),
                        row2Tuple
                ));

                IndexLocker pkLocker = new HashIndexLocker(indexId, true, txManagers.get(assignment).lockManager(), row2Tuple);

                PeersAndLearners configuration = PeersAndLearners.fromConsistentIds(partAssignments);

                PendingComparableValuesTracker<HybridTimestamp, Void> safeTime =
                        new PendingComparableValuesTracker<>(clocks.get(assignment).now());
                PendingComparableValuesTracker<Long, Void> storageIndexTracker = new PendingComparableValuesTracker<>(0L);

                PartitionDataStorage partitionDataStorage = new TestPartitionDataStorage(mvPartStorage);

                IndexUpdateHandler indexUpdateHandler = new IndexUpdateHandler(
                        DummyInternalTableImpl.createTableIndexStoragesSupplier(Map.of(pkStorage.get().id(), pkStorage.get()))
                );

                StorageUpdateHandler storageUpdateHandler = new StorageUpdateHandler(
                        partId,
                        partitionDataStorage,
                        gcConfig,
                        mock(LowWatermark.class),
                        indexUpdateHandler,
                        new GcUpdateHandler(partitionDataStorage, safeTime, indexUpdateHandler)
                );

                TopologyAwareRaftGroupServiceFactory topologyAwareRaftGroupServiceFactory = new TopologyAwareRaftGroupServiceFactory(
                        clusterServices.get(assignment),
                        logicalTopologyService(clusterServices.get(assignment)),
                        Loza.FACTORY,
                        new RaftGroupEventsClientListener()
                );

                CompletableFuture<Void> partitionReadyFuture = raftServers.get(assignment).startRaftGroupNode(
                        new RaftNodeId(grpId, configuration.peer(assignment)),
                        configuration,
                        new PartitionListener(
                                partitionDataStorage,
                                storageUpdateHandler,
                                txStateStorage,
                                safeTime,
                                storageIndexTracker
                        ),
                        RaftGroupEventsListener.noopLsnr,
                        topologyAwareRaftGroupServiceFactory
                ).thenAccept(
                        raftSvc -> {
                            try {
                                DummySchemaManagerImpl schemaManager = new DummySchemaManagerImpl(schemaDescriptor);
                                replicaManagers.get(assignment).startReplica(
                                        new TablePartitionId(tblId, partId),
                                        completedFuture(null),
                                        new PartitionReplicaListener(
                                                mvPartStorage,
                                                raftSvc,
                                                txManagers.get(assignment),
                                                txManagers.get(assignment).lockManager(),
                                                Runnable::run,
                                                partId,
                                                tblId,
                                                () -> Map.of(pkLocker.id(), pkLocker),
                                                pkStorage,
                                                Map::of,
                                                clocks.get(assignment),
                                                safeTime,
                                                txStateStorage,
                                                placementDriver,
                                                storageUpdateHandler,
                                                new DummySchemas(schemaManager),
                                                completedFuture(schemaManager),
                                                consistentIdToNode.apply(assignment),
                                                mvTableStorage,
                                                mock(IndexBuilder.class),
                                                tablesConfig
                                        ),
                                        raftSvc,
                                        storageIndexTracker
                                );
                            } catch (NodeStoppingException e) {
                                fail("Unexpected node stopping", e);
                            }
                        }
                );

                partitionReadyFutures.add(partitionReadyFuture);
            }

            PeersAndLearners membersConf = PeersAndLearners.fromConsistentIds(partAssignments);

            if (startClient()) {
                RaftGroupService service = RaftGroupServiceImpl
                        .start(grpId, client, FACTORY, raftConfiguration, membersConf, true, executor)
                        .get(5, TimeUnit.SECONDS);

                clients.put(p, service);
            } else {
                // Create temporary client to find a leader address.
                ClusterService tmpSvc = cluster.get(0);

                RaftGroupService service = RaftGroupServiceImpl
                        .start(grpId, tmpSvc, FACTORY, raftConfiguration, membersConf, true, executor)
                        .get(5, TimeUnit.SECONDS);

                Peer leader = service.leader();

                service.shutdown();

                ClusterService leaderSrv = cluster.stream()
                        .filter(cluster -> cluster.topologyService().localMember().name().equals(leader.consistentId()))
                        .findAny()
                        .orElseThrow();

                RaftGroupService leaderClusterSvc = RaftGroupServiceImpl
                        .start(grpId, leaderSrv, FACTORY, raftConfiguration, membersConf, true, executor)
                        .get(5, TimeUnit.SECONDS);

                clients.put(p, leaderClusterSvc);
            }
        }

        CompletableFuture.allOf(partitionReadyFutures.toArray(new CompletableFuture[0])).join();

        return clients;
    }