in modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxDistributedTestSingleNode.java [410:578]
private Int2ObjectOpenHashMap<RaftGroupService> startTable(int tblId, SchemaDescriptor schemaDescriptor) throws Exception {
List<Set<Assignment>> calculatedAssignments = AffinityUtils.calculateAssignments(
cluster.stream().map(node -> node.topologyService().localMember().name()).collect(toList()),
1,
replicas()
);
List<Set<String>> assignments = calculatedAssignments.stream()
.map(a -> a.stream().map(Assignment::consistentId).collect(toSet()))
.collect(toList());
List<TablePartitionId> grpIds = IntStream.range(0, assignments.size())
.mapToObj(i -> new TablePartitionId(tblId, i))
.collect(toList());
Int2ObjectOpenHashMap<RaftGroupService> clients = new Int2ObjectOpenHashMap<>();
List<CompletableFuture<Void>> partitionReadyFutures = new ArrayList<>();
int globalIndexId = 1;
for (int p = 0; p < assignments.size(); p++) {
Set<String> partAssignments = assignments.get(p);
TablePartitionId grpId = grpIds.get(p);
for (String assignment : partAssignments) {
var mvTableStorage = new TestMvTableStorage(tblId, DEFAULT_PARTITION_COUNT);
var mvPartStorage = new TestMvPartitionStorage(0);
var txStateStorage = txStateStorages.get(assignment);
var placementDriver = new PlacementDriver(replicaServices.get(assignment), consistentIdToNode);
for (int part = 0; part < assignments.size(); part++) {
placementDriver.updateAssignment(grpIds.get(part), assignments.get(part));
}
int partId = p;
int indexId = globalIndexId++;
Function<BinaryRow, BinaryTuple> row2Tuple = BinaryRowConverter.keyExtractor(schemaDescriptor);
Lazy<TableSchemaAwareIndexStorage> pkStorage = new Lazy<>(() -> new TableSchemaAwareIndexStorage(
indexId,
new TestHashIndexStorage(partId, null),
row2Tuple
));
IndexLocker pkLocker = new HashIndexLocker(indexId, true, txManagers.get(assignment).lockManager(), row2Tuple);
PeersAndLearners configuration = PeersAndLearners.fromConsistentIds(partAssignments);
PendingComparableValuesTracker<HybridTimestamp, Void> safeTime =
new PendingComparableValuesTracker<>(clocks.get(assignment).now());
PendingComparableValuesTracker<Long, Void> storageIndexTracker = new PendingComparableValuesTracker<>(0L);
PartitionDataStorage partitionDataStorage = new TestPartitionDataStorage(mvPartStorage);
IndexUpdateHandler indexUpdateHandler = new IndexUpdateHandler(
DummyInternalTableImpl.createTableIndexStoragesSupplier(Map.of(pkStorage.get().id(), pkStorage.get()))
);
StorageUpdateHandler storageUpdateHandler = new StorageUpdateHandler(
partId,
partitionDataStorage,
gcConfig,
mock(LowWatermark.class),
indexUpdateHandler,
new GcUpdateHandler(partitionDataStorage, safeTime, indexUpdateHandler)
);
TopologyAwareRaftGroupServiceFactory topologyAwareRaftGroupServiceFactory = new TopologyAwareRaftGroupServiceFactory(
clusterServices.get(assignment),
logicalTopologyService(clusterServices.get(assignment)),
Loza.FACTORY,
new RaftGroupEventsClientListener()
);
CompletableFuture<Void> partitionReadyFuture = raftServers.get(assignment).startRaftGroupNode(
new RaftNodeId(grpId, configuration.peer(assignment)),
configuration,
new PartitionListener(
partitionDataStorage,
storageUpdateHandler,
txStateStorage,
safeTime,
storageIndexTracker
),
RaftGroupEventsListener.noopLsnr,
topologyAwareRaftGroupServiceFactory
).thenAccept(
raftSvc -> {
try {
DummySchemaManagerImpl schemaManager = new DummySchemaManagerImpl(schemaDescriptor);
replicaManagers.get(assignment).startReplica(
new TablePartitionId(tblId, partId),
completedFuture(null),
new PartitionReplicaListener(
mvPartStorage,
raftSvc,
txManagers.get(assignment),
txManagers.get(assignment).lockManager(),
Runnable::run,
partId,
tblId,
() -> Map.of(pkLocker.id(), pkLocker),
pkStorage,
Map::of,
clocks.get(assignment),
safeTime,
txStateStorage,
placementDriver,
storageUpdateHandler,
new DummySchemas(schemaManager),
completedFuture(schemaManager),
consistentIdToNode.apply(assignment),
mvTableStorage,
mock(IndexBuilder.class),
tablesConfig
),
raftSvc,
storageIndexTracker
);
} catch (NodeStoppingException e) {
fail("Unexpected node stopping", e);
}
}
);
partitionReadyFutures.add(partitionReadyFuture);
}
PeersAndLearners membersConf = PeersAndLearners.fromConsistentIds(partAssignments);
if (startClient()) {
RaftGroupService service = RaftGroupServiceImpl
.start(grpId, client, FACTORY, raftConfiguration, membersConf, true, executor)
.get(5, TimeUnit.SECONDS);
clients.put(p, service);
} else {
// Create temporary client to find a leader address.
ClusterService tmpSvc = cluster.get(0);
RaftGroupService service = RaftGroupServiceImpl
.start(grpId, tmpSvc, FACTORY, raftConfiguration, membersConf, true, executor)
.get(5, TimeUnit.SECONDS);
Peer leader = service.leader();
service.shutdown();
ClusterService leaderSrv = cluster.stream()
.filter(cluster -> cluster.topologyService().localMember().name().equals(leader.consistentId()))
.findAny()
.orElseThrow();
RaftGroupService leaderClusterSvc = RaftGroupServiceImpl
.start(grpId, leaderSrv, FACTORY, raftConfiguration, membersConf, true, executor)
.get(5, TimeUnit.SECONDS);
clients.put(p, leaderClusterSvc);
}
}
CompletableFuture.allOf(partitionReadyFutures.toArray(new CompletableFuture[0])).join();
return clients;
}