in modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxDistributedTestSingleNode.java [240:401]
public void before() throws Exception {
int nodes = nodes();
int replicas = replicas();
assertTrue(nodes > 0);
assertTrue(replicas > 0);
List<NetworkAddress> localAddresses = findLocalAddresses(NODE_PORT_BASE,
NODE_PORT_BASE + nodes);
var nodeFinder = new StaticNodeFinder(localAddresses);
clusterServices = new HashMap<>(nodes);
nodeFinder.findNodes().parallelStream()
.forEach(addr -> {
ClusterService svc = startNode(testInfo, addr.toString(), addr.port(), nodeFinder);
cluster.add(svc);
clusterServices.put(svc.topologyService().localMember().name(), svc);
});
for (ClusterService node : cluster) {
assertTrue(waitForTopology(node, nodes, 1000));
}
log.info("The cluster has been started");
if (startClient()) {
client = startNode(testInfo, "client", NODE_PORT_BASE - 1, nodeFinder);
assertTrue(waitForTopology(client, nodes + 1, 1000));
clientClock = new HybridClockImpl();
log.info("Replica manager has been started, node=[" + client.topologyService().localMember() + ']');
clientReplicaSvc = new ReplicaService(
client.messagingService(),
clientClock
);
log.info("The client has been started");
}
// Start raft servers. Each raft server can hold multiple groups.
clocks = new HashMap<>(nodes);
raftServers = new HashMap<>(nodes);
replicaManagers = new HashMap<>(nodes);
replicaServices = new HashMap<>(nodes);
txManagers = new HashMap<>(nodes);
txStateStorages = new HashMap<>(nodes);
executor = new ScheduledThreadPoolExecutor(20,
new NamedThreadFactory(Loza.CLIENT_POOL_NAME, LOG));
for (int i = 0; i < nodes; i++) {
ClusterNode node = cluster.get(i).topologyService().localMember();
HybridClock clock = new HybridClockImpl();
clocks.put(node.name(), clock);
var raftSrv = new Loza(
cluster.get(i),
raftConfiguration,
workDir.resolve("node" + i),
clock
);
raftSrv.start();
raftServers.put(node.name(), raftSrv);
var cmgManager = mock(ClusterManagementGroupManager.class);
// This test is run without Meta storage.
when(cmgManager.metaStorageNodes()).thenReturn(completedFuture(Set.of()));
ReplicaManager replicaMgr = new ReplicaManager(
node.name(),
cluster.get(i),
cmgManager,
clock,
Set.of(TableMessageGroup.class, TxMessageGroup.class)
);
replicaMgr.start();
replicaManagers.put(node.name(), replicaMgr);
log.info("Replica manager has been started, node=[" + node + ']');
ReplicaService replicaSvc = new ReplicaService(
cluster.get(i).messagingService(),
clock
);
replicaServices.put(node.name(), replicaSvc);
TxManagerImpl txMgr = new TxManagerImpl(replicaSvc, new HeapLockManager(), clock, new TransactionIdGenerator(i));
txMgr.start();
txManagers.put(node.name(), txMgr);
txStateStorages.put(node.name(), new TestTxStateStorage());
}
log.info("Raft servers have been started");
final String accountsName = "accounts";
final String customersName = "customers";
int accTblId = 1;
int custTblId = 2;
accRaftClients = startTable(accTblId, ACCOUNTS_SCHEMA);
custRaftClients = startTable(custTblId, CUSTOMERS_SCHEMA);
log.info("Partition groups have been started");
String localNodeName = accRaftClients.get(0).clusterService().topologyService().localMember().name();
if (startClient()) {
clientTxManager = new TxManagerImpl(clientReplicaSvc, new HeapLockManager(), clientClock, new TransactionIdGenerator(-1));
} else {
// Collocated mode.
clientTxManager = txManagers.get(localNodeName);
}
assertNotNull(clientTxManager);
igniteTransactions = new IgniteTransactionsImpl(clientTxManager);
this.accounts = new TableImpl(new InternalTableImpl(
accountsName,
accTblId,
accRaftClients,
1,
consistentIdToNode,
clientTxManager,
mock(MvTableStorage.class),
mock(TxStateTableStorage.class),
startClient() ? clientReplicaSvc : replicaServices.get(localNodeName),
startClient() ? clientClock : clocks.get(localNodeName)
), new DummySchemaManagerImpl(ACCOUNTS_SCHEMA), clientTxManager.lockManager());
this.customers = new TableImpl(new InternalTableImpl(
customersName,
custTblId,
custRaftClients,
1,
consistentIdToNode,
clientTxManager,
mock(MvTableStorage.class),
mock(TxStateTableStorage.class),
startClient() ? clientReplicaSvc : replicaServices.get(localNodeName),
startClient() ? clientClock : clocks.get(localNodeName)
), new DummySchemaManagerImpl(CUSTOMERS_SCHEMA), clientTxManager.lockManager());
log.info("Tables have been started");
}