public DummyInternalTableImpl()

in modules/table/src/testFixtures/java/org/apache/ignite/internal/table/impl/DummyInternalTableImpl.java [277:590]


    public DummyInternalTableImpl(
            ReplicaService replicaSvc,
            MvPartitionStorage mvPartStorage,
            boolean crossTableUsage,
            @Nullable TransactionStateResolver transactionStateResolver,
            SchemaDescriptor schema,
            HybridTimestampTracker tracker,
            PlacementDriver placementDriver,
            ReplicationConfiguration replicationConfiguration,
            TransactionConfiguration txConfiguration,
            SystemDistributedConfiguration systemCfg,
            RemotelyTriggeredResourceRegistry resourcesRegistry,
            TransactionInflights transactionInflights
    ) {
        super(
                QualifiedNameHelper.fromNormalized(SqlCommon.DEFAULT_SCHEMA_NAME, "test"),
                ZONE_ID, // zone id.
                nextTableId.getAndIncrement(), // table id.
                1, // number of partitions.
                new SingleClusterNodeResolver(LOCAL_NODE),
                txManager(replicaSvc, placementDriver, txConfiguration, systemCfg, resourcesRegistry),
                mock(MvTableStorage.class),
                new TestTxStateStorage(),
                replicaSvc,
                CLOCK_SERVICE,
                tracker,
                placementDriver,
                transactionInflights,
                null,
                mock(StreamerReceiverRunner.class),
                () -> 10_000L,
                () -> 10_000L
        );

        RaftGroupService svc = mock(RaftGroupService.class);

        groupId = crossTableUsage ? new TablePartitionId(tableId(), PART_ID) : crossTableGroupId;

        lenient().doReturn(groupId).when(svc).groupId();
        Peer leaderPeer = new Peer(UUID.randomUUID().toString());
        lenient().doReturn(leaderPeer).when(svc).leader();
        lenient().doReturn(completedFuture(new LeaderWithTerm(leaderPeer, 1L))).when(svc).refreshAndGetLeaderWithTerm();

        if (!crossTableUsage) {
            // Delegate replica requests directly to replica listener.
            lenient()
                    .doAnswer(invocationOnMock -> {
                        ClusterNode node = invocationOnMock.getArgument(0);

                        return replicaListener.invoke(invocationOnMock.getArgument(1), node.id()).thenApply(ReplicaResult::result);
                    })
                    .when(replicaSvc).invoke(any(ClusterNode.class), any());

            lenient()
                    .doAnswer(invocationOnMock -> {
                        String nodeConsistenId = invocationOnMock.getArgument(0);
                        UUID nodeId = deriveUuidFrom(nodeConsistenId);

                        return replicaListener.invoke(invocationOnMock.getArgument(1), nodeId).thenApply(ReplicaResult::result);
                    })
                    .when(replicaSvc).invoke(anyString(), any());

            lenient()
                    .doAnswer(invocationOnMock -> {
                        ClusterNode node = invocationOnMock.getArgument(0);

                        return replicaListener.invoke(invocationOnMock.getArgument(1), node.id())
                                .thenApply(DummyInternalTableImpl::dummyTimestampAwareResponse);
                    })
                    .when(replicaSvc).invokeRaw(any(ClusterNode.class), any());

            lenient()
                    .doAnswer(invocationOnMock -> {
                        String nodeConsistenId = invocationOnMock.getArgument(0);
                        UUID nodeId = deriveUuidFrom(nodeConsistenId);

                        return replicaListener.invoke(invocationOnMock.getArgument(1), nodeId)
                                .thenApply(DummyInternalTableImpl::dummyTimestampAwareResponse);
                    })
                    .when(replicaSvc).invokeRaw(anyString(), any());
        }

        AtomicLong raftIndex = new AtomicLong(1);

        // Delegate directly to listener.
        lenient().doAnswer(
                invocationClose -> {
                    synchronized (raftServiceMutex) {
                        Command cmd = invocationClose.getArgument(0);

                        long commandIndex = raftIndex.incrementAndGet();

                        HybridTimestamp safeTs = cmd instanceof SafeTimePropagatingCommand ? CLOCK.now() : null;

                        CompletableFuture<Serializable> res = new CompletableFuture<>();

                        // All read commands are handled directly throw partition replica listener.
                        CommandClosure<WriteCommand> clo = new CommandClosure<>() {
                            /** {@inheritDoc} */
                            @Override
                            public long index() {
                                return commandIndex;
                            }

                            /** {@inheritDoc} */
                            @Override
                            public HybridTimestamp safeTimestamp() {
                                return safeTs;
                            }

                            /** {@inheritDoc} */
                            @Override
                            public @Nullable WriteCommand command() {
                                return (WriteCommand) cmd;
                            }

                            /** {@inheritDoc} */
                            @Override
                            public void result(@Nullable Serializable r) {
                                if (r instanceof Throwable) {
                                    res.completeExceptionally((Throwable) r);
                                } else {
                                    res.complete(r);
                                }
                            }
                        };

                        try {
                            partitionListener.onWrite(List.of(clo).iterator());
                        } catch (Throwable e) {
                            res.completeExceptionally(new TransactionException(INTERNAL_ERR, e));
                        }

                        return res;
                    }
                }
        ).when(svc).run(any());

        int tableId = tableId();
        int indexId = 1;

        ColumnsExtractor row2Tuple = BinaryRowConverter.keyExtractor(schema);

        StorageHashIndexDescriptor pkIndexDescriptor = mock(StorageHashIndexDescriptor.class);

        when(pkIndexDescriptor.columns()).then(
                invocation -> Collections.nCopies(schema.keyColumns().size(), mock(StorageHashIndexColumnDescriptor.class))
        );

        Lazy<TableSchemaAwareIndexStorage> pkStorage = new Lazy<>(() -> new TableSchemaAwareIndexStorage(
                indexId,
                new TestHashIndexStorage(PART_ID, pkIndexDescriptor),
                row2Tuple
        ));

        IndexLocker pkLocker = new HashIndexLocker(indexId, true, this.txManager.lockManager(), row2Tuple);

        safeTime = new SafeTimeValuesTracker(HybridTimestamp.MIN_VALUE);

        PartitionDataStorage partitionDataStorage = new TestPartitionDataStorage(tableId, PART_ID, mvPartStorage);
        TableIndexStoragesSupplier indexes = createTableIndexStoragesSupplier(Map.of(pkStorage.get().id(), pkStorage.get()));

        IndexUpdateHandler indexUpdateHandler = new IndexUpdateHandler(indexes);

        StorageUpdateHandler storageUpdateHandler = new StorageUpdateHandler(
                PART_ID,
                partitionDataStorage,
                indexUpdateHandler,
                replicationConfiguration
        );

        DummySchemaManagerImpl schemaManager = new DummySchemaManagerImpl(schema);

        Catalog catalog = mock(Catalog.class);
        CatalogService catalogService = mock(CatalogService.class);
        CatalogTableDescriptor tableDescriptor = mock(CatalogTableDescriptor.class);

        lenient().when(catalogService.catalog(anyInt())).thenReturn(catalog);
        lenient().when(catalogService.activeCatalog(anyLong())).thenReturn(catalog);
        lenient().when(catalog.table(anyInt())).thenReturn(tableDescriptor);
        lenient().when(tableDescriptor.tableVersion()).thenReturn(1);

        CatalogIndexDescriptor indexDescriptor = mock(CatalogIndexDescriptor.class);
        lenient().when(indexDescriptor.id()).thenReturn(pkStorage.get().id());

        lenient().when(catalog.indexes(anyInt())).thenReturn(List.of(indexDescriptor));

        ZonePartitionId zonePartitionId = new ZonePartitionId(ZONE_ID, PART_ID);
        TablePartitionId tablePartitionId = new TablePartitionId(tableId, PART_ID);

        var tableReplicaListener = new PartitionReplicaListener(
                mvPartStorage,
                svc,
                this.txManager,
                this.txManager.lockManager(),
                Runnable::run,
                tablePartitionId,
                tableId,
                () -> Map.of(pkLocker.id(), pkLocker),
                pkStorage,
                Map::of,
                CLOCK_SERVICE,
                safeTime,
                txStateStorage().getOrCreatePartitionStorage(PART_ID),
                transactionStateResolver,
                storageUpdateHandler,
                new DummyValidationSchemasSource(schemaManager),
                LOCAL_NODE,
                new AlwaysSyncedSchemaSyncService(),
                catalogService,
                placementDriver,
                mock(ClusterNodeResolver.class),
                resourcesRegistry,
                schemaManager,
                mock(IndexMetaStorage.class),
                new TestLowWatermark(),
                mock(FailureProcessor.class)
        );

        if (enabledColocation) {
            ZonePartitionReplicaListener zoneReplicaListener = new ZonePartitionReplicaListener(
                    txStateStorage().getOrCreatePartitionStorage(PART_ID),
                    CLOCK_SERVICE,
                    this.txManager,
                    new DummyValidationSchemasSource(schemaManager),
                    new AlwaysSyncedSchemaSyncService(),
                    catalogService,
                    placementDriver,
                    mock(ClusterNodeResolver.class),
                    svc,
                    mock(FailureProcessor.class),
                    LOCAL_NODE,
                    zonePartitionId
            );

            zoneReplicaListener.addTableReplicaProcessor(tableId, raftClient -> tableReplicaListener);

            replicaListener = zoneReplicaListener;
        } else {
            replicaListener = tableReplicaListener;
        }

        HybridClock clock = new HybridClockImpl();
        ClockService clockService = mock(ClockService.class);
        lenient().when(clockService.current()).thenReturn(clock.current());

        PendingComparableValuesTracker<Long, Void> storageIndexTracker = new PendingComparableValuesTracker<>(0L);
        var tablePartitionListener = new PartitionListener(
                this.txManager,
                new TestPartitionDataStorage(tableId, PART_ID, mvPartStorage),
                storageUpdateHandler,
                txStateStorage().getOrCreatePartitionStorage(PART_ID),
                safeTime,
                storageIndexTracker,
                catalogService,
                schemaManager,
                mock(IndexMetaStorage.class),
                LOCAL_NODE.id(),
                mock(MinimumRequiredTimeCollectorService.class),
                mock(Executor.class),
                placementDriver,
                clockService,
                enabledColocation ? zonePartitionId : tablePartitionId
        );

        if (enabledColocation) {
            ZonePartitionRaftListener zoneRaftListener = new ZonePartitionRaftListener(
                    zonePartitionId,
                    txStateStorage().getOrCreatePartitionStorage(PART_ID),
                    this.txManager,
                    safeTime,
                    storageIndexTracker,
                    new NoOpPartitionsSnapshots(),
                    mock(Executor.class)
            );

            zoneRaftListener.addTableProcessor(tableId, tablePartitionListener);

            partitionListener = zoneRaftListener;
        } else {
            partitionListener = tablePartitionListener;
        }

        // Update(All)Command handling requires both information about raft group topology and the primary replica,
        // thus onConfigurationCommited and primaryReplicaChangeCommand are called.
        {
            partitionListener.onConfigurationCommitted(
                    new RaftGroupConfiguration(
                            1,
                            1,
                            List.of(LOCAL_NODE.name()),
                            Collections.emptyList(),
                            null,
                            null
                    ),
                    1,
                    1
            );

            CompletableFuture<ReplicaMeta> primaryMetaFuture = placementDriver.getPrimaryReplica(groupId, CLOCK.now());

            assertThat(primaryMetaFuture, willCompleteSuccessfully());

            ReplicaMeta primary = primaryMetaFuture.join();

            PrimaryReplicaChangeCommand primaryReplicaChangeCommand = REPLICA_MESSAGES_FACTORY.primaryReplicaChangeCommand()
                    .leaseStartTime(primary.getStartTime().longValue())
                    .primaryReplicaNodeId(primary.getLeaseholderId())
                    .primaryReplicaNodeName(primary.getLeaseholder())
                    .build();

            assertThat(svc.run(primaryReplicaChangeCommand), willCompleteSuccessfully());
        }
    }