public void prepareCluster()

in modules/table/src/testFixtures/java/org/apache/ignite/distributed/ItTxTestCluster.java [369:576]


    public void prepareCluster() throws Exception {
        assertTrue(nodes > 0);
        assertTrue(replicas > 0);

        clusterServices = new ConcurrentHashMap<>(nodes);

        localAddresses.parallelStream()
                .forEach(addr -> {
                    ClusterService svc = startNode(testInfo, addr.toString(), addr.port(), nodeFinder);
                    cluster.add(svc);
                    clusterServices.put(extractConsistentId(svc), svc);
                });

        for (ClusterService node : cluster) {
            assertTrue(waitForTopology(node, nodes, 1000));
        }

        ClusterNode firstNode = first(cluster).topologyService().localMember();

        placementDriver = new TestPlacementDriver(firstNode);

        catalogService = mock(CatalogService.class);
        catalog = mock(Catalog.class);
        lenient().when(catalogService.activeCatalog(anyLong())).thenReturn(catalog);
        lenient().when(catalogService.catalog(anyInt())).thenReturn(catalog);

        LOG.info("The cluster has been started");

        if (startClient) {
            startClient();
        }

        // Start raft servers. Each raft server can hold multiple groups.
        clocks = new HashMap<>(nodes);
        clockWaiters = new ArrayList<>(nodes);
        clockServices = new HashMap<>(nodes);
        raftServers = new HashMap<>(nodes);
        replicaManagers = new HashMap<>(nodes);
        replicaServices = new HashMap<>(nodes);
        txManagers = new HashMap<>(nodes);
        resourceCleanupManagers = new HashMap<>(nodes);
        txInflights = new HashMap<>(nodes);
        cursorRegistries = new HashMap<>(nodes);
        txStateStorages = new HashMap<>(nodes);
        raftConfigurers = new HashMap<>(nodes);
        logStorageFactories = new HashMap<>(nodes);

        executor = new ScheduledThreadPoolExecutor(20,
                new NamedThreadFactory(Loza.CLIENT_POOL_NAME, LOG));

        partitionOperationsExecutor = Executors.newFixedThreadPool(
                5,
                NamedThreadFactory.create("test", "partition-operations", LOG)
        );

        for (int i = 0; i < nodes; i++) {
            ClusterService clusterService = cluster.get(i);

            ClusterNode node = clusterService.topologyService().localMember();

            HybridClock clock = createClock(node);
            ClockWaiter clockWaiter = new ClockWaiter("test-node" + i, clock, executor);
            assertThat(clockWaiter.startAsync(new ComponentContext()), willCompleteSuccessfully());
            TestClockService clockService = new TestClockService(clock, clockWaiter);

            String nodeName = node.name();

            clocks.put(nodeName, clock);
            clockWaiters.add(clockWaiter);
            clockServices.put(nodeName, clockService);

            Path partitionsWorkDir = workDir.resolve("node" + i);

            LogStorageFactory partitionsLogStorageFactory = SharedLogStorageFactoryUtils.create(
                    "test",
                    clusterService.nodeName(),
                    partitionsWorkDir.resolve("log"),
                    raftConfig.fsync().value()
            );

            logStorageFactories.put(nodeName, partitionsLogStorageFactory);

            assertThat(partitionsLogStorageFactory.startAsync(new ComponentContext()), willCompleteSuccessfully());

            RaftGroupOptionsConfigurer partitionRaftConfigurer =
                    RaftGroupOptionsConfigHelper.configureProperties(partitionsLogStorageFactory, partitionsWorkDir.resolve("meta"));

            raftConfigurers.put(nodeName, partitionRaftConfigurer);

            var raftSrv = TestLozaFactory.create(
                    clusterService,
                    raftConfig,
                    clock,
                    new RaftGroupEventsClientListener()
            );

            assertThat(raftSrv.startAsync(new ComponentContext()), willCompleteSuccessfully());

            raftServers.put(nodeName, raftSrv);

            var cmgManager = mock(ClusterManagementGroupManager.class);

            // This test is run without Meta storage.
            when(cmgManager.metaStorageNodes()).thenReturn(emptySetCompletedFuture());

            var commandMarshaller = new ThreadLocalPartitionCommandsMarshaller(clusterService.serializationRegistry());

            var raftClientFactory = new TopologyAwareRaftGroupServiceFactory(
                    clusterService,
                    logicalTopologyService(clusterService),
                    Loza.FACTORY,
                    new RaftGroupEventsClientListener()
            );

            ReplicaManager replicaMgr = new ReplicaManager(
                    nodeName,
                    clusterService,
                    cmgManager,
                    clockService,
                    Set.of(PartitionReplicationMessageGroup.class, TxMessageGroup.class),
                    placementDriver,
                    partitionOperationsExecutor,
                    this::getSafeTimePropagationTimeout,
                    new NoOpFailureManager(),
                    commandMarshaller,
                    raftClientFactory,
                    raftSrv,
                    partitionRaftConfigurer,
                    new VolatileLogStorageFactoryCreator(nodeName, workDir.resolve("volatile-log-spillout")),
                    ForkJoinPool.commonPool(),
                    replicaGrpId -> nullCompletedFuture()
            );

            assertThat(replicaMgr.startAsync(new ComponentContext()), willCompleteSuccessfully());

            replicaManagers.put(nodeName, replicaMgr);

            LOG.info("Replica manager has been started, node=[" + node + ']');

            ReplicaService replicaSvc = spy(new ReplicaService(
                    clusterService.messagingService(),
                    clock,
                    partitionOperationsExecutor,
                    replicationConfiguration,
                    executor
            ));

            replicaServices.put(nodeName, replicaSvc);

            var resourcesRegistry = new RemotelyTriggeredResourceRegistry();

            TransactionInflights transactionInflights = new TransactionInflights(placementDriver, clockService);

            txInflights.put(nodeName, transactionInflights);

            cursorRegistries.put(nodeName, resourcesRegistry);

            TxManagerImpl txMgr = newTxManager(
                    clusterService,
                    replicaSvc,
                    clockService,
                    new TransactionIdGenerator(i),
                    node,
                    placementDriver,
                    resourcesRegistry,
                    transactionInflights,
                    lowWatermark
            );

            ResourceVacuumManager resourceVacuumManager = new ResourceVacuumManager(
                    nodeName,
                    resourcesRegistry,
                    clusterService.topologyService(),
                    clusterService.messagingService(),
                    transactionInflights,
                    txMgr,
                    lowWatermark,
                    new NoOpFailureManager()
            );

            assertThat(txMgr.startAsync(new ComponentContext()), willCompleteSuccessfully());
            txManagers.put(nodeName, txMgr);

            assertThat(resourceVacuumManager.startAsync(new ComponentContext()), willCompleteSuccessfully());
            resourceCleanupManagers.put(nodeName, resourceVacuumManager);

            txStateStorages.put(nodeName, new TestTxStatePartitionStorage());
        }

        LOG.info("Raft servers have been started");

        LOG.info("Partition groups have been started");

        localNodeName = extractConsistentId(cluster.get(0));

        if (startClient) {
            initializeClientTxComponents();
        } else {
            // Collocated mode.
            clientTxManager = txManagers.get(localNodeName);
            clientResourceVacuumManager = resourceCleanupManagers.get(localNodeName);
            clientTransactionInflights = txInflights.get(localNodeName);
        }

        igniteTransactions = new IgniteTransactionsImpl(clientTxManager, timestampTracker);

        assertNotNull(clientTxManager);
    }