in modules/table/src/testFixtures/java/org/apache/ignite/distributed/ItTxTestCluster.java [369:576]
public void prepareCluster() throws Exception {
assertTrue(nodes > 0);
assertTrue(replicas > 0);
clusterServices = new ConcurrentHashMap<>(nodes);
localAddresses.parallelStream()
.forEach(addr -> {
ClusterService svc = startNode(testInfo, addr.toString(), addr.port(), nodeFinder);
cluster.add(svc);
clusterServices.put(extractConsistentId(svc), svc);
});
for (ClusterService node : cluster) {
assertTrue(waitForTopology(node, nodes, 1000));
}
ClusterNode firstNode = first(cluster).topologyService().localMember();
placementDriver = new TestPlacementDriver(firstNode);
catalogService = mock(CatalogService.class);
catalog = mock(Catalog.class);
lenient().when(catalogService.activeCatalog(anyLong())).thenReturn(catalog);
lenient().when(catalogService.catalog(anyInt())).thenReturn(catalog);
LOG.info("The cluster has been started");
if (startClient) {
startClient();
}
// Start raft servers. Each raft server can hold multiple groups.
clocks = new HashMap<>(nodes);
clockWaiters = new ArrayList<>(nodes);
clockServices = new HashMap<>(nodes);
raftServers = new HashMap<>(nodes);
replicaManagers = new HashMap<>(nodes);
replicaServices = new HashMap<>(nodes);
txManagers = new HashMap<>(nodes);
resourceCleanupManagers = new HashMap<>(nodes);
txInflights = new HashMap<>(nodes);
cursorRegistries = new HashMap<>(nodes);
txStateStorages = new HashMap<>(nodes);
raftConfigurers = new HashMap<>(nodes);
logStorageFactories = new HashMap<>(nodes);
executor = new ScheduledThreadPoolExecutor(20,
new NamedThreadFactory(Loza.CLIENT_POOL_NAME, LOG));
partitionOperationsExecutor = Executors.newFixedThreadPool(
5,
NamedThreadFactory.create("test", "partition-operations", LOG)
);
for (int i = 0; i < nodes; i++) {
ClusterService clusterService = cluster.get(i);
ClusterNode node = clusterService.topologyService().localMember();
HybridClock clock = createClock(node);
ClockWaiter clockWaiter = new ClockWaiter("test-node" + i, clock, executor);
assertThat(clockWaiter.startAsync(new ComponentContext()), willCompleteSuccessfully());
TestClockService clockService = new TestClockService(clock, clockWaiter);
String nodeName = node.name();
clocks.put(nodeName, clock);
clockWaiters.add(clockWaiter);
clockServices.put(nodeName, clockService);
Path partitionsWorkDir = workDir.resolve("node" + i);
LogStorageFactory partitionsLogStorageFactory = SharedLogStorageFactoryUtils.create(
"test",
clusterService.nodeName(),
partitionsWorkDir.resolve("log"),
raftConfig.fsync().value()
);
logStorageFactories.put(nodeName, partitionsLogStorageFactory);
assertThat(partitionsLogStorageFactory.startAsync(new ComponentContext()), willCompleteSuccessfully());
RaftGroupOptionsConfigurer partitionRaftConfigurer =
RaftGroupOptionsConfigHelper.configureProperties(partitionsLogStorageFactory, partitionsWorkDir.resolve("meta"));
raftConfigurers.put(nodeName, partitionRaftConfigurer);
var raftSrv = TestLozaFactory.create(
clusterService,
raftConfig,
clock,
new RaftGroupEventsClientListener()
);
assertThat(raftSrv.startAsync(new ComponentContext()), willCompleteSuccessfully());
raftServers.put(nodeName, raftSrv);
var cmgManager = mock(ClusterManagementGroupManager.class);
// This test is run without Meta storage.
when(cmgManager.metaStorageNodes()).thenReturn(emptySetCompletedFuture());
var commandMarshaller = new ThreadLocalPartitionCommandsMarshaller(clusterService.serializationRegistry());
var raftClientFactory = new TopologyAwareRaftGroupServiceFactory(
clusterService,
logicalTopologyService(clusterService),
Loza.FACTORY,
new RaftGroupEventsClientListener()
);
ReplicaManager replicaMgr = new ReplicaManager(
nodeName,
clusterService,
cmgManager,
clockService,
Set.of(PartitionReplicationMessageGroup.class, TxMessageGroup.class),
placementDriver,
partitionOperationsExecutor,
this::getSafeTimePropagationTimeout,
new NoOpFailureManager(),
commandMarshaller,
raftClientFactory,
raftSrv,
partitionRaftConfigurer,
new VolatileLogStorageFactoryCreator(nodeName, workDir.resolve("volatile-log-spillout")),
ForkJoinPool.commonPool(),
replicaGrpId -> nullCompletedFuture()
);
assertThat(replicaMgr.startAsync(new ComponentContext()), willCompleteSuccessfully());
replicaManagers.put(nodeName, replicaMgr);
LOG.info("Replica manager has been started, node=[" + node + ']');
ReplicaService replicaSvc = spy(new ReplicaService(
clusterService.messagingService(),
clock,
partitionOperationsExecutor,
replicationConfiguration,
executor
));
replicaServices.put(nodeName, replicaSvc);
var resourcesRegistry = new RemotelyTriggeredResourceRegistry();
TransactionInflights transactionInflights = new TransactionInflights(placementDriver, clockService);
txInflights.put(nodeName, transactionInflights);
cursorRegistries.put(nodeName, resourcesRegistry);
TxManagerImpl txMgr = newTxManager(
clusterService,
replicaSvc,
clockService,
new TransactionIdGenerator(i),
node,
placementDriver,
resourcesRegistry,
transactionInflights,
lowWatermark
);
ResourceVacuumManager resourceVacuumManager = new ResourceVacuumManager(
nodeName,
resourcesRegistry,
clusterService.topologyService(),
clusterService.messagingService(),
transactionInflights,
txMgr,
lowWatermark,
new NoOpFailureManager()
);
assertThat(txMgr.startAsync(new ComponentContext()), willCompleteSuccessfully());
txManagers.put(nodeName, txMgr);
assertThat(resourceVacuumManager.startAsync(new ComponentContext()), willCompleteSuccessfully());
resourceCleanupManagers.put(nodeName, resourceVacuumManager);
txStateStorages.put(nodeName, new TestTxStatePartitionStorage());
}
LOG.info("Raft servers have been started");
LOG.info("Partition groups have been started");
localNodeName = extractConsistentId(cluster.get(0));
if (startClient) {
initializeClientTxComponents();
} else {
// Collocated mode.
clientTxManager = txManagers.get(localNodeName);
clientResourceVacuumManager = resourceCleanupManagers.get(localNodeName);
clientTransactionInflights = txInflights.get(localNodeName);
}
igniteTransactions = new IgniteTransactionsImpl(clientTxManager, timestampTracker);
assertNotNull(clientTxManager);
}