static void beforeAllTests()

in modules/table/src/integrationTest/java/org/apache/ignite/internal/table/ItColocationTest.java [196:375]


    static void beforeAllTests() {
        ClusterNode clusterNode = DummyInternalTableImpl.LOCAL_NODE;

        ClusterService clusterService = mock(ClusterService.class, RETURNS_DEEP_STUBS);
        when(clusterService.messagingService()).thenReturn(mock(MessagingService.class));
        when(clusterService.topologyService().localMember()).thenReturn(clusterNode);

        ReplicaService replicaService = mock(ReplicaService.class, RETURNS_DEEP_STUBS);

        RemotelyTriggeredResourceRegistry resourcesRegistry = new RemotelyTriggeredResourceRegistry();

        PlacementDriver placementDriver = new TestPlacementDriver(clusterNode);

        HybridClock clock = new HybridClockImpl();
        ClockService clockService = new TestClockService(clock);

        TransactionInflights transactionInflights = new TransactionInflights(placementDriver, clockService);

        txManager = new TxManagerImpl(
                txConfiguration,
                systemDistributedConfiguration,
                clusterService,
                replicaService,
                new HeapLockManager(systemLocalConfiguration),
                clockService,
                new TransactionIdGenerator(0xdeadbeef),
                placementDriver,
                () -> DEFAULT_IDLE_SAFE_TIME_PROPAGATION_PERIOD_MILLISECONDS,
                new TestLocalRwTxCounter(),
                resourcesRegistry,
                transactionInflights,
                new TestLowWatermark(),
                commonExecutor
        ) {
            @Override
            public CompletableFuture<Void> finish(
                    HybridTimestampTracker observableTimestampTracker,
                    ReplicationGroupId commitPartition,
                    boolean commitIntent,
                    boolean timeoutExceeded,
                    Map<ReplicationGroupId, PendingTxPartitionEnlistment> enlistedGroups,
                    UUID txId
            ) {
                return nullCompletedFuture();
            }
        };

        assertThat(txManager.startAsync(new ComponentContext()), willCompleteSuccessfully());

        Int2ObjectMap<RaftGroupService> partRafts = new Int2ObjectOpenHashMap<>();
        Map<ReplicationGroupId, RaftGroupService> groupRafts = new HashMap<>();

        for (int i = 0; i < PARTS; ++i) {
            RaftGroupService r = mock(RaftGroupService.class);

            final int part = i;
            doAnswer(invocation -> {
                Command cmd = (Command) invocation.getArguments()[0];

                CMDS_MAP.merge(part, new HashSet<>(Set.of(cmd)), (newSet, set) -> {
                    set.addAll(newSet);

                    return set;
                });

                if (cmd instanceof UpdateAllCommand) {
                    return completedFuture(((UpdateAllCommand) cmd).rowsToUpdate().keySet().stream()
                            .map(uuid -> new NullBinaryRow())
                            .collect(Collectors.toList()));
                } else {
                    return trueCompletedFuture();
                }
            }).when(r).run(any());

            partRafts.put(i, r);
            groupRafts.put(enabledColocation() ? new ZonePartitionId(ZONE_ID, i) : new TablePartitionId(TABLE_ID, i), r);
        }

        Answer<CompletableFuture<?>> clo = invocation -> {
            String nodeName = invocation.getArgument(0);
            ClusterNode node = clusterNodeByName(nodeName);
            ReplicaRequest request = invocation.getArgument(1);

            ReplicationGroupIdMessage commitPartId = enabledColocation()
                    ? toZonePartitionIdMessage(REPLICA_MESSAGES_FACTORY, new ZonePartitionId(ZONE_ID, 0)) :
                    toTablePartitionIdMessage(REPLICA_MESSAGES_FACTORY, new TablePartitionId(TABLE_ID, 0));

            RaftGroupService r = groupRafts.get(request.groupId().asReplicationGroupId());

            if (request instanceof ReadWriteMultiRowReplicaRequest) {
                ReadWriteMultiRowReplicaRequest multiRowReplicaRequest = (ReadWriteMultiRowReplicaRequest) request;

                Map<UUID, TimedBinaryRowMessage> rows = multiRowReplicaRequest.binaryTuples().stream()
                        .collect(
                                toMap(tupleBuffer -> TestTransactionIds.newTransactionId(),
                                        tupleBuffer -> PARTITION_REPLICATION_MESSAGES_FACTORY.timedBinaryRowMessage()
                                                .binaryRowMessage(binaryRowMessage(tupleBuffer, multiRowReplicaRequest))
                                                .build())
                        );

                return r.run(PARTITION_REPLICATION_MESSAGES_FACTORY.updateAllCommand()
                        .tableId(TABLE_ID)
                        .commitPartitionId(commitPartId)
                        .messageRowsToUpdate(rows)
                        .txId(UUID.randomUUID())
                        .txCoordinatorId(node.id())
                        .initiatorTime(clock.now())
                        .build());
            } else {
                assertThat(request, is(instanceOf(ReadWriteSingleRowReplicaRequest.class)));

                ReadWriteSingleRowReplicaRequest singleRowReplicaRequest = (ReadWriteSingleRowReplicaRequest) request;

                return r.run(PARTITION_REPLICATION_MESSAGES_FACTORY.updateCommand()
                        .tableId(TABLE_ID)
                        .commitPartitionId(commitPartId)
                        .rowUuid(UUID.randomUUID())
                        .messageRowToUpdate(PARTITION_REPLICATION_MESSAGES_FACTORY.timedBinaryRowMessage()
                                .binaryRowMessage(binaryRowMessage(singleRowReplicaRequest.binaryTuple(), singleRowReplicaRequest))
                                .build())
                        .txId(TestTransactionIds.newTransactionId())
                        .txCoordinatorId(node.id())
                        .initiatorTime(clock.now())
                        .build());
            }
        };
        when(replicaService.invoke(any(String.class), any())).thenAnswer(clo);
        when(replicaService.invokeRaw(any(String.class), any())).thenAnswer(
                invocation -> clo.answer(invocation).thenApply(res -> new TimestampAwareReplicaResponse() {
                    @Override
                    public @Nullable Object result() {
                        return res;
                    }

                    @Override
                    public @Nullable HybridTimestamp timestamp() {
                        return clock.now();
                    }

                    @Override
                    public MessageSerializer<NetworkMessage> serializer() {
                        return null;
                    }

                    @Override
                    public short messageType() {
                        return 0;
                    }

                    @Override
                    public short groupType() {
                        return 0;
                    }

                    @Override
                    public NetworkMessage clone() {
                        return null;
                    }
                }));

        intTable = new InternalTableImpl(
                QualifiedNameHelper.fromNormalized(SqlCommon.DEFAULT_SCHEMA_NAME, "TEST"),
                ZONE_ID, // zone id.
                TABLE_ID, // table id.
                PARTS, // number of partitions.
                new SingleClusterNodeResolver(clusterNode),
                txManager,
                mock(MvTableStorage.class),
                new TestTxStateStorage(),
                replicaService,
                clockService,
                observableTimestampTracker,
                new TestPlacementDriver(clusterNode),
                transactionInflights,
                null,
                mock(StreamerReceiverRunner.class),
                () -> 10_000L,
                () -> 10_000L
        );
    }