public void before()

in modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxDistributedTestSingleNode.java [240:401]


    public void before() throws Exception {
        int nodes = nodes();
        int replicas = replicas();

        assertTrue(nodes > 0);
        assertTrue(replicas > 0);

        List<NetworkAddress> localAddresses = findLocalAddresses(NODE_PORT_BASE,
                NODE_PORT_BASE + nodes);

        var nodeFinder = new StaticNodeFinder(localAddresses);

        clusterServices = new HashMap<>(nodes);

        nodeFinder.findNodes().parallelStream()
                .forEach(addr -> {
                    ClusterService svc = startNode(testInfo, addr.toString(), addr.port(), nodeFinder);
                    cluster.add(svc);
                    clusterServices.put(svc.topologyService().localMember().name(), svc);
                });

        for (ClusterService node : cluster) {
            assertTrue(waitForTopology(node, nodes, 1000));
        }

        log.info("The cluster has been started");

        if (startClient()) {
            client = startNode(testInfo, "client", NODE_PORT_BASE - 1, nodeFinder);

            assertTrue(waitForTopology(client, nodes + 1, 1000));

            clientClock = new HybridClockImpl();

            log.info("Replica manager has been started, node=[" + client.topologyService().localMember() + ']');

            clientReplicaSvc = new ReplicaService(
                    client.messagingService(),
                    clientClock
            );

            log.info("The client has been started");
        }

        // Start raft servers. Each raft server can hold multiple groups.
        clocks = new HashMap<>(nodes);
        raftServers = new HashMap<>(nodes);
        replicaManagers = new HashMap<>(nodes);
        replicaServices = new HashMap<>(nodes);
        txManagers = new HashMap<>(nodes);
        txStateStorages = new HashMap<>(nodes);

        executor = new ScheduledThreadPoolExecutor(20,
                new NamedThreadFactory(Loza.CLIENT_POOL_NAME, LOG));

        for (int i = 0; i < nodes; i++) {
            ClusterNode node = cluster.get(i).topologyService().localMember();

            HybridClock clock = new HybridClockImpl();

            clocks.put(node.name(), clock);

            var raftSrv = new Loza(
                    cluster.get(i),
                    raftConfiguration,
                    workDir.resolve("node" + i),
                    clock
            );

            raftSrv.start();

            raftServers.put(node.name(), raftSrv);

            var cmgManager = mock(ClusterManagementGroupManager.class);

            // This test is run without Meta storage.
            when(cmgManager.metaStorageNodes()).thenReturn(completedFuture(Set.of()));

            ReplicaManager replicaMgr = new ReplicaManager(
                    node.name(),
                    cluster.get(i),
                    cmgManager,
                    clock,
                    Set.of(TableMessageGroup.class, TxMessageGroup.class)
            );

            replicaMgr.start();

            replicaManagers.put(node.name(), replicaMgr);

            log.info("Replica manager has been started, node=[" + node + ']');

            ReplicaService replicaSvc = new ReplicaService(
                    cluster.get(i).messagingService(),
                    clock
            );

            replicaServices.put(node.name(), replicaSvc);

            TxManagerImpl txMgr = new TxManagerImpl(replicaSvc, new HeapLockManager(), clock, new TransactionIdGenerator(i));

            txMgr.start();

            txManagers.put(node.name(), txMgr);

            txStateStorages.put(node.name(), new TestTxStateStorage());
        }

        log.info("Raft servers have been started");

        final String accountsName = "accounts";
        final String customersName = "customers";

        int accTblId = 1;
        int custTblId = 2;

        accRaftClients = startTable(accTblId, ACCOUNTS_SCHEMA);
        custRaftClients = startTable(custTblId, CUSTOMERS_SCHEMA);

        log.info("Partition groups have been started");

        String localNodeName = accRaftClients.get(0).clusterService().topologyService().localMember().name();

        if (startClient()) {
            clientTxManager = new TxManagerImpl(clientReplicaSvc, new HeapLockManager(), clientClock, new TransactionIdGenerator(-1));
        } else {
            // Collocated mode.
            clientTxManager = txManagers.get(localNodeName);
        }

        assertNotNull(clientTxManager);

        igniteTransactions = new IgniteTransactionsImpl(clientTxManager);

        this.accounts = new TableImpl(new InternalTableImpl(
                accountsName,
                accTblId,
                accRaftClients,
                1,
                consistentIdToNode,
                clientTxManager,
                mock(MvTableStorage.class),
                mock(TxStateTableStorage.class),
                startClient() ? clientReplicaSvc : replicaServices.get(localNodeName),
                startClient() ? clientClock : clocks.get(localNodeName)
        ), new DummySchemaManagerImpl(ACCOUNTS_SCHEMA), clientTxManager.lockManager());

        this.customers = new TableImpl(new InternalTableImpl(
                customersName,
                custTblId,
                custRaftClients,
                1,
                consistentIdToNode,
                clientTxManager,
                mock(MvTableStorage.class),
                mock(TxStateTableStorage.class),
                startClient() ? clientReplicaSvc : replicaServices.get(localNodeName),
                startClient() ? clientClock : clocks.get(localNodeName)
        ), new DummySchemaManagerImpl(CUSTOMERS_SCHEMA), clientTxManager.lockManager());

        log.info("Tables have been started");
    }