in modules/table/src/testFixtures/java/org/apache/ignite/distributed/ItTxTestCluster.java [630:843]
public TableViewInternal startTable(String tableName, SchemaDescriptor schemaDescriptor) throws Exception {
int predefinedZoneId = 2;
int tableId = globalCatalogId.getAndIncrement();
CatalogTableDescriptor tableDescriptor = mock(CatalogTableDescriptor.class);
when(tableDescriptor.id()).thenReturn(tableId);
when(tableDescriptor.tableVersion()).thenReturn(SCHEMA_VERSION);
lenient().when(catalog.table(eq(tableId))).thenReturn(tableDescriptor);
List<Set<Assignment>> calculatedAssignments = calculateAssignments(
cluster.stream().map(ItTxTestCluster::extractConsistentId).collect(toList()),
1,
replicas
);
List<Set<String>> assignments = calculatedAssignments.stream()
.map(a -> a.stream().map(Assignment::consistentId).collect(toSet()))
.collect(toList());
List<ReplicationGroupId> grpIds = IntStream.range(0, assignments.size())
.mapToObj(i -> enabledColocation() ? new ZonePartitionId(predefinedZoneId, i) : new TablePartitionId(tableId, i))
.collect(toList());
List<CompletableFuture<?>> partitionReadyFutures = new ArrayList<>();
int indexId = globalCatalogId.getAndIncrement();
CatalogIndexDescriptor pkCatalogIndexDescriptor = mock(CatalogIndexDescriptor.class);
when(pkCatalogIndexDescriptor.id()).thenReturn(indexId);
when(catalog.indexes(eq(tableId))).thenReturn(List.of(pkCatalogIndexDescriptor));
InternalTableImpl internalTable = new InternalTableImpl(
QualifiedNameHelper.fromNormalized(SqlCommon.DEFAULT_SCHEMA_NAME, tableName),
predefinedZoneId,
tableId,
1, // number of partitions.
nodeResolver,
clientTxManager,
mock(MvTableStorage.class),
mock(TxStateStorage.class),
startClient ? clientReplicaSvc : replicaServices.get(localNodeName),
startClient ? clientClockService : clockServices.get(localNodeName),
timestampTracker,
placementDriver,
clientTransactionInflights,
null,
mock(StreamerReceiverRunner.class),
() -> 10_000L,
() -> 10_000L
);
TableImpl table = new TableImpl(
internalTable,
new DummySchemaManagerImpl(schemaDescriptor),
clientTxManager.lockManager(),
new ConstantSchemaVersions(SCHEMA_VERSION),
mock(IgniteSql.class),
pkCatalogIndexDescriptor.id()
);
tables.put(tableName, table);
for (int p = 0; p < assignments.size(); p++) {
Set<String> partAssignments = assignments.get(p);
PeersAndLearners configuration = PeersAndLearners.fromConsistentIds(partAssignments);
ReplicationGroupId grpId = grpIds.get(p);
for (String assignment : partAssignments) {
int partId = p;
var mvPartStorage = new TestMvPartitionStorage(partId);
var txStateStorage = txStateStorages.get(assignment);
TxMessageSender txMessageSender =
new TxMessageSender(
clusterServices.get(assignment).messagingService(),
replicaServices.get(assignment),
clockServices.get(assignment)
);
var transactionStateResolver = new TransactionStateResolver(
txManagers.get(assignment),
clockServices.get(assignment),
nodeResolver,
clusterServices.get(assignment).messagingService(),
placementDriver,
txMessageSender
);
transactionStateResolver.start();
ColumnsExtractor row2Tuple = BinaryRowConverter.keyExtractor(schemaDescriptor);
StorageHashIndexDescriptor pkIndexDescriptor = mock(StorageHashIndexDescriptor.class);
when(pkIndexDescriptor.columns()).then(invocation -> Collections.nCopies(
schemaDescriptor.keyColumns().size(),
mock(StorageHashIndexColumnDescriptor.class)
));
Lazy<TableSchemaAwareIndexStorage> pkStorage = new Lazy<>(() -> new TableSchemaAwareIndexStorage(
indexId,
new TestHashIndexStorage(partId, pkIndexDescriptor),
row2Tuple
));
IndexLocker pkLocker = new HashIndexLocker(indexId, true, txManagers.get(assignment).lockManager(), row2Tuple);
SafeTimeValuesTracker safeTime = new SafeTimeValuesTracker(HybridTimestamp.MIN_VALUE);
PendingComparableValuesTracker<Long, Void> storageIndexTracker = new PendingComparableValuesTracker<>(0L);
PartitionDataStorage partitionDataStorage = new TestPartitionDataStorage(tableId, partId, mvPartStorage);
IndexUpdateHandler indexUpdateHandler = new IndexUpdateHandler(
DummyInternalTableImpl.createTableIndexStoragesSupplier(Map.of(pkStorage.get().id(), pkStorage.get()))
);
StorageUpdateHandler storageUpdateHandler = new StorageUpdateHandler(
partId,
partitionDataStorage,
indexUpdateHandler,
replicationConfiguration
);
DummySchemaManagerImpl schemaManager = new DummySchemaManagerImpl(schemaDescriptor);
RaftGroupListener raftGroupListener = getOrCreateAndPopulateRaftGroupListener(
assignment,
predefinedZoneId,
partId,
tableId,
partitionDataStorage,
storageUpdateHandler,
txStateStorage,
safeTime,
storageIndexTracker,
catalogService,
schemaManager
);
Function<RaftGroupService, ReplicaListener> replicaListenerProvider =
raftClient -> getOrCreateAndPopulateReplicaListenerProvider(
assignment,
mvPartStorage,
raftClient,
txManagers.get(assignment),
new ZonePartitionId(predefinedZoneId, partId),
tableId,
() -> Map.of(pkLocker.id(), pkLocker),
pkStorage,
Map::of,
clockServices.get(assignment),
safeTime,
txStateStorage,
transactionStateResolver,
storageUpdateHandler,
new DummyValidationSchemasSource(schemaManager),
nodeResolver.getByConsistentId(assignment),
new AlwaysSyncedSchemaSyncService(),
catalogService,
placementDriver,
nodeResolver,
cursorRegistries.get(assignment),
schemaManager
);
CompletableFuture<?> partitionReadyFuture = startReplica(
assignment,
raftGroupListener,
replicaListenerProvider,
storageIndexTracker,
grpId,
configuration
);
partitionReadyFutures.add(partitionReadyFuture);
}
}
allOf(partitionReadyFutures.toArray(new CompletableFuture[0])).join();
for (int p = 0; p < assignments.size(); p++) {
ReplicationGroupId grpId = grpIds.get(p);
CompletableFuture<ReplicaMeta> primaryFuture = placementDriver.getPrimaryReplica(grpId,
clockServices.values().iterator().next().now());
// TestPlacementDriver always returns completed futures.
assert primaryFuture.isDone();
ReplicaMeta primary = primaryFuture.join();
assert primary.getLeaseholderId() != null;
PrimaryReplicaChangeCommand cmd = REPLICA_MESSAGES_FACTORY.primaryReplicaChangeCommand()
.leaseStartTime(primary.getStartTime().longValue())
.primaryReplicaNodeId(primary.getLeaseholderId())
.primaryReplicaNodeName(primary.getLeaseholder())
.build();
CompletableFuture<RaftGroupService> raftClientFuture = getRaftClientForGroup(grpId);
assertThat(raftClientFuture, willCompleteSuccessfully());
CompletableFuture<?> primaryReplicaChangePropagationFuture = raftClientFuture.join().run(cmd);
partitionReadyFutures.add(primaryReplicaChangePropagationFuture);
}
allOf(partitionReadyFutures.toArray(new CompletableFuture[0])).join();
return table;
}