public TableViewInternal startTable()

in modules/table/src/testFixtures/java/org/apache/ignite/distributed/ItTxTestCluster.java [630:843]


    public TableViewInternal startTable(String tableName, SchemaDescriptor schemaDescriptor) throws Exception {
        int predefinedZoneId = 2;
        int tableId = globalCatalogId.getAndIncrement();

        CatalogTableDescriptor tableDescriptor = mock(CatalogTableDescriptor.class);
        when(tableDescriptor.id()).thenReturn(tableId);
        when(tableDescriptor.tableVersion()).thenReturn(SCHEMA_VERSION);

        lenient().when(catalog.table(eq(tableId))).thenReturn(tableDescriptor);

        List<Set<Assignment>> calculatedAssignments = calculateAssignments(
                cluster.stream().map(ItTxTestCluster::extractConsistentId).collect(toList()),
                1,
                replicas
        );

        List<Set<String>> assignments = calculatedAssignments.stream()
                .map(a -> a.stream().map(Assignment::consistentId).collect(toSet()))
                .collect(toList());

        List<ReplicationGroupId> grpIds = IntStream.range(0, assignments.size())
                .mapToObj(i -> enabledColocation() ? new ZonePartitionId(predefinedZoneId, i) : new TablePartitionId(tableId, i))
                .collect(toList());

        List<CompletableFuture<?>> partitionReadyFutures = new ArrayList<>();

        int indexId = globalCatalogId.getAndIncrement();

        CatalogIndexDescriptor pkCatalogIndexDescriptor = mock(CatalogIndexDescriptor.class);
        when(pkCatalogIndexDescriptor.id()).thenReturn(indexId);

        when(catalog.indexes(eq(tableId))).thenReturn(List.of(pkCatalogIndexDescriptor));

        InternalTableImpl internalTable = new InternalTableImpl(
                QualifiedNameHelper.fromNormalized(SqlCommon.DEFAULT_SCHEMA_NAME, tableName),
                predefinedZoneId,
                tableId,
                1, // number of partitions.
                nodeResolver,
                clientTxManager,
                mock(MvTableStorage.class),
                mock(TxStateStorage.class),
                startClient ? clientReplicaSvc : replicaServices.get(localNodeName),
                startClient ? clientClockService : clockServices.get(localNodeName),
                timestampTracker,
                placementDriver,
                clientTransactionInflights,
                null,
                mock(StreamerReceiverRunner.class),
                () -> 10_000L,
                () -> 10_000L
        );

        TableImpl table = new TableImpl(
                internalTable,
                new DummySchemaManagerImpl(schemaDescriptor),
                clientTxManager.lockManager(),
                new ConstantSchemaVersions(SCHEMA_VERSION),
                mock(IgniteSql.class),
                pkCatalogIndexDescriptor.id()
        );

        tables.put(tableName, table);

        for (int p = 0; p < assignments.size(); p++) {
            Set<String> partAssignments = assignments.get(p);

            PeersAndLearners configuration = PeersAndLearners.fromConsistentIds(partAssignments);

            ReplicationGroupId grpId = grpIds.get(p);

            for (String assignment : partAssignments) {
                int partId = p;

                var mvPartStorage = new TestMvPartitionStorage(partId);
                var txStateStorage = txStateStorages.get(assignment);
                TxMessageSender txMessageSender =
                        new TxMessageSender(
                                clusterServices.get(assignment).messagingService(),
                                replicaServices.get(assignment),
                                clockServices.get(assignment)
                        );

                var transactionStateResolver = new TransactionStateResolver(
                        txManagers.get(assignment),
                        clockServices.get(assignment),
                        nodeResolver,
                        clusterServices.get(assignment).messagingService(),
                        placementDriver,
                        txMessageSender
                );
                transactionStateResolver.start();

                ColumnsExtractor row2Tuple = BinaryRowConverter.keyExtractor(schemaDescriptor);

                StorageHashIndexDescriptor pkIndexDescriptor = mock(StorageHashIndexDescriptor.class);

                when(pkIndexDescriptor.columns()).then(invocation -> Collections.nCopies(
                        schemaDescriptor.keyColumns().size(),
                        mock(StorageHashIndexColumnDescriptor.class)
                ));

                Lazy<TableSchemaAwareIndexStorage> pkStorage = new Lazy<>(() -> new TableSchemaAwareIndexStorage(
                        indexId,
                        new TestHashIndexStorage(partId, pkIndexDescriptor),
                        row2Tuple
                ));

                IndexLocker pkLocker = new HashIndexLocker(indexId, true, txManagers.get(assignment).lockManager(), row2Tuple);

                SafeTimeValuesTracker safeTime = new SafeTimeValuesTracker(HybridTimestamp.MIN_VALUE);
                PendingComparableValuesTracker<Long, Void> storageIndexTracker = new PendingComparableValuesTracker<>(0L);

                PartitionDataStorage partitionDataStorage = new TestPartitionDataStorage(tableId, partId, mvPartStorage);

                IndexUpdateHandler indexUpdateHandler = new IndexUpdateHandler(
                        DummyInternalTableImpl.createTableIndexStoragesSupplier(Map.of(pkStorage.get().id(), pkStorage.get()))
                );

                StorageUpdateHandler storageUpdateHandler = new StorageUpdateHandler(
                        partId,
                        partitionDataStorage,
                        indexUpdateHandler,
                        replicationConfiguration
                );

                DummySchemaManagerImpl schemaManager = new DummySchemaManagerImpl(schemaDescriptor);

                RaftGroupListener raftGroupListener = getOrCreateAndPopulateRaftGroupListener(
                        assignment,
                        predefinedZoneId,
                        partId,
                        tableId,
                        partitionDataStorage,
                        storageUpdateHandler,
                        txStateStorage,
                        safeTime,
                        storageIndexTracker,
                        catalogService,
                        schemaManager
                );

                Function<RaftGroupService, ReplicaListener> replicaListenerProvider =
                        raftClient -> getOrCreateAndPopulateReplicaListenerProvider(
                                assignment,
                                mvPartStorage,
                                raftClient,
                                txManagers.get(assignment),
                                new ZonePartitionId(predefinedZoneId, partId),
                                tableId,
                                () -> Map.of(pkLocker.id(), pkLocker),
                                pkStorage,
                                Map::of,
                                clockServices.get(assignment),
                                safeTime,
                                txStateStorage,
                                transactionStateResolver,
                                storageUpdateHandler,
                                new DummyValidationSchemasSource(schemaManager),
                                nodeResolver.getByConsistentId(assignment),
                                new AlwaysSyncedSchemaSyncService(),
                                catalogService,
                                placementDriver,
                                nodeResolver,
                                cursorRegistries.get(assignment),
                                schemaManager
                        );

                CompletableFuture<?> partitionReadyFuture = startReplica(
                        assignment,
                        raftGroupListener,
                        replicaListenerProvider,
                        storageIndexTracker,
                        grpId,
                        configuration
                );

                partitionReadyFutures.add(partitionReadyFuture);
            }
        }

        allOf(partitionReadyFutures.toArray(new CompletableFuture[0])).join();

        for (int p = 0; p < assignments.size(); p++) {
            ReplicationGroupId grpId = grpIds.get(p);
            CompletableFuture<ReplicaMeta> primaryFuture = placementDriver.getPrimaryReplica(grpId,
                    clockServices.values().iterator().next().now());

            // TestPlacementDriver always returns completed futures.
            assert primaryFuture.isDone();

            ReplicaMeta primary = primaryFuture.join();

            assert primary.getLeaseholderId() != null;

            PrimaryReplicaChangeCommand cmd = REPLICA_MESSAGES_FACTORY.primaryReplicaChangeCommand()
                    .leaseStartTime(primary.getStartTime().longValue())
                    .primaryReplicaNodeId(primary.getLeaseholderId())
                    .primaryReplicaNodeName(primary.getLeaseholder())
                    .build();

            CompletableFuture<RaftGroupService> raftClientFuture = getRaftClientForGroup(grpId);

            assertThat(raftClientFuture, willCompleteSuccessfully());

            CompletableFuture<?> primaryReplicaChangePropagationFuture = raftClientFuture.join().run(cmd);

            partitionReadyFutures.add(primaryReplicaChangePropagationFuture);
        }

        allOf(partitionReadyFutures.toArray(new CompletableFuture[0])).join();

        return table;
    }