in modules/table/src/testFixtures/java/org/apache/ignite/internal/table/impl/DummyInternalTableImpl.java [277:590]
public DummyInternalTableImpl(
ReplicaService replicaSvc,
MvPartitionStorage mvPartStorage,
boolean crossTableUsage,
@Nullable TransactionStateResolver transactionStateResolver,
SchemaDescriptor schema,
HybridTimestampTracker tracker,
PlacementDriver placementDriver,
ReplicationConfiguration replicationConfiguration,
TransactionConfiguration txConfiguration,
SystemDistributedConfiguration systemCfg,
RemotelyTriggeredResourceRegistry resourcesRegistry,
TransactionInflights transactionInflights
) {
super(
QualifiedNameHelper.fromNormalized(SqlCommon.DEFAULT_SCHEMA_NAME, "test"),
ZONE_ID, // zone id.
nextTableId.getAndIncrement(), // table id.
1, // number of partitions.
new SingleClusterNodeResolver(LOCAL_NODE),
txManager(replicaSvc, placementDriver, txConfiguration, systemCfg, resourcesRegistry),
mock(MvTableStorage.class),
new TestTxStateStorage(),
replicaSvc,
CLOCK_SERVICE,
tracker,
placementDriver,
transactionInflights,
null,
mock(StreamerReceiverRunner.class),
() -> 10_000L,
() -> 10_000L
);
RaftGroupService svc = mock(RaftGroupService.class);
groupId = crossTableUsage ? new TablePartitionId(tableId(), PART_ID) : crossTableGroupId;
lenient().doReturn(groupId).when(svc).groupId();
Peer leaderPeer = new Peer(UUID.randomUUID().toString());
lenient().doReturn(leaderPeer).when(svc).leader();
lenient().doReturn(completedFuture(new LeaderWithTerm(leaderPeer, 1L))).when(svc).refreshAndGetLeaderWithTerm();
if (!crossTableUsage) {
// Delegate replica requests directly to replica listener.
lenient()
.doAnswer(invocationOnMock -> {
ClusterNode node = invocationOnMock.getArgument(0);
return replicaListener.invoke(invocationOnMock.getArgument(1), node.id()).thenApply(ReplicaResult::result);
})
.when(replicaSvc).invoke(any(ClusterNode.class), any());
lenient()
.doAnswer(invocationOnMock -> {
String nodeConsistenId = invocationOnMock.getArgument(0);
UUID nodeId = deriveUuidFrom(nodeConsistenId);
return replicaListener.invoke(invocationOnMock.getArgument(1), nodeId).thenApply(ReplicaResult::result);
})
.when(replicaSvc).invoke(anyString(), any());
lenient()
.doAnswer(invocationOnMock -> {
ClusterNode node = invocationOnMock.getArgument(0);
return replicaListener.invoke(invocationOnMock.getArgument(1), node.id())
.thenApply(DummyInternalTableImpl::dummyTimestampAwareResponse);
})
.when(replicaSvc).invokeRaw(any(ClusterNode.class), any());
lenient()
.doAnswer(invocationOnMock -> {
String nodeConsistenId = invocationOnMock.getArgument(0);
UUID nodeId = deriveUuidFrom(nodeConsistenId);
return replicaListener.invoke(invocationOnMock.getArgument(1), nodeId)
.thenApply(DummyInternalTableImpl::dummyTimestampAwareResponse);
})
.when(replicaSvc).invokeRaw(anyString(), any());
}
AtomicLong raftIndex = new AtomicLong(1);
// Delegate directly to listener.
lenient().doAnswer(
invocationClose -> {
synchronized (raftServiceMutex) {
Command cmd = invocationClose.getArgument(0);
long commandIndex = raftIndex.incrementAndGet();
HybridTimestamp safeTs = cmd instanceof SafeTimePropagatingCommand ? CLOCK.now() : null;
CompletableFuture<Serializable> res = new CompletableFuture<>();
// All read commands are handled directly throw partition replica listener.
CommandClosure<WriteCommand> clo = new CommandClosure<>() {
/** {@inheritDoc} */
@Override
public long index() {
return commandIndex;
}
/** {@inheritDoc} */
@Override
public HybridTimestamp safeTimestamp() {
return safeTs;
}
/** {@inheritDoc} */
@Override
public @Nullable WriteCommand command() {
return (WriteCommand) cmd;
}
/** {@inheritDoc} */
@Override
public void result(@Nullable Serializable r) {
if (r instanceof Throwable) {
res.completeExceptionally((Throwable) r);
} else {
res.complete(r);
}
}
};
try {
partitionListener.onWrite(List.of(clo).iterator());
} catch (Throwable e) {
res.completeExceptionally(new TransactionException(INTERNAL_ERR, e));
}
return res;
}
}
).when(svc).run(any());
int tableId = tableId();
int indexId = 1;
ColumnsExtractor row2Tuple = BinaryRowConverter.keyExtractor(schema);
StorageHashIndexDescriptor pkIndexDescriptor = mock(StorageHashIndexDescriptor.class);
when(pkIndexDescriptor.columns()).then(
invocation -> Collections.nCopies(schema.keyColumns().size(), mock(StorageHashIndexColumnDescriptor.class))
);
Lazy<TableSchemaAwareIndexStorage> pkStorage = new Lazy<>(() -> new TableSchemaAwareIndexStorage(
indexId,
new TestHashIndexStorage(PART_ID, pkIndexDescriptor),
row2Tuple
));
IndexLocker pkLocker = new HashIndexLocker(indexId, true, this.txManager.lockManager(), row2Tuple);
safeTime = new SafeTimeValuesTracker(HybridTimestamp.MIN_VALUE);
PartitionDataStorage partitionDataStorage = new TestPartitionDataStorage(tableId, PART_ID, mvPartStorage);
TableIndexStoragesSupplier indexes = createTableIndexStoragesSupplier(Map.of(pkStorage.get().id(), pkStorage.get()));
IndexUpdateHandler indexUpdateHandler = new IndexUpdateHandler(indexes);
StorageUpdateHandler storageUpdateHandler = new StorageUpdateHandler(
PART_ID,
partitionDataStorage,
indexUpdateHandler,
replicationConfiguration
);
DummySchemaManagerImpl schemaManager = new DummySchemaManagerImpl(schema);
Catalog catalog = mock(Catalog.class);
CatalogService catalogService = mock(CatalogService.class);
CatalogTableDescriptor tableDescriptor = mock(CatalogTableDescriptor.class);
lenient().when(catalogService.catalog(anyInt())).thenReturn(catalog);
lenient().when(catalogService.activeCatalog(anyLong())).thenReturn(catalog);
lenient().when(catalog.table(anyInt())).thenReturn(tableDescriptor);
lenient().when(tableDescriptor.tableVersion()).thenReturn(1);
CatalogIndexDescriptor indexDescriptor = mock(CatalogIndexDescriptor.class);
lenient().when(indexDescriptor.id()).thenReturn(pkStorage.get().id());
lenient().when(catalog.indexes(anyInt())).thenReturn(List.of(indexDescriptor));
ZonePartitionId zonePartitionId = new ZonePartitionId(ZONE_ID, PART_ID);
TablePartitionId tablePartitionId = new TablePartitionId(tableId, PART_ID);
var tableReplicaListener = new PartitionReplicaListener(
mvPartStorage,
svc,
this.txManager,
this.txManager.lockManager(),
Runnable::run,
tablePartitionId,
tableId,
() -> Map.of(pkLocker.id(), pkLocker),
pkStorage,
Map::of,
CLOCK_SERVICE,
safeTime,
txStateStorage().getOrCreatePartitionStorage(PART_ID),
transactionStateResolver,
storageUpdateHandler,
new DummyValidationSchemasSource(schemaManager),
LOCAL_NODE,
new AlwaysSyncedSchemaSyncService(),
catalogService,
placementDriver,
mock(ClusterNodeResolver.class),
resourcesRegistry,
schemaManager,
mock(IndexMetaStorage.class),
new TestLowWatermark(),
mock(FailureProcessor.class)
);
if (enabledColocation) {
ZonePartitionReplicaListener zoneReplicaListener = new ZonePartitionReplicaListener(
txStateStorage().getOrCreatePartitionStorage(PART_ID),
CLOCK_SERVICE,
this.txManager,
new DummyValidationSchemasSource(schemaManager),
new AlwaysSyncedSchemaSyncService(),
catalogService,
placementDriver,
mock(ClusterNodeResolver.class),
svc,
mock(FailureProcessor.class),
LOCAL_NODE,
zonePartitionId
);
zoneReplicaListener.addTableReplicaProcessor(tableId, raftClient -> tableReplicaListener);
replicaListener = zoneReplicaListener;
} else {
replicaListener = tableReplicaListener;
}
HybridClock clock = new HybridClockImpl();
ClockService clockService = mock(ClockService.class);
lenient().when(clockService.current()).thenReturn(clock.current());
PendingComparableValuesTracker<Long, Void> storageIndexTracker = new PendingComparableValuesTracker<>(0L);
var tablePartitionListener = new PartitionListener(
this.txManager,
new TestPartitionDataStorage(tableId, PART_ID, mvPartStorage),
storageUpdateHandler,
txStateStorage().getOrCreatePartitionStorage(PART_ID),
safeTime,
storageIndexTracker,
catalogService,
schemaManager,
mock(IndexMetaStorage.class),
LOCAL_NODE.id(),
mock(MinimumRequiredTimeCollectorService.class),
mock(Executor.class),
placementDriver,
clockService,
enabledColocation ? zonePartitionId : tablePartitionId
);
if (enabledColocation) {
ZonePartitionRaftListener zoneRaftListener = new ZonePartitionRaftListener(
zonePartitionId,
txStateStorage().getOrCreatePartitionStorage(PART_ID),
this.txManager,
safeTime,
storageIndexTracker,
new NoOpPartitionsSnapshots(),
mock(Executor.class)
);
zoneRaftListener.addTableProcessor(tableId, tablePartitionListener);
partitionListener = zoneRaftListener;
} else {
partitionListener = tablePartitionListener;
}
// Update(All)Command handling requires both information about raft group topology and the primary replica,
// thus onConfigurationCommited and primaryReplicaChangeCommand are called.
{
partitionListener.onConfigurationCommitted(
new RaftGroupConfiguration(
1,
1,
List.of(LOCAL_NODE.name()),
Collections.emptyList(),
null,
null
),
1,
1
);
CompletableFuture<ReplicaMeta> primaryMetaFuture = placementDriver.getPrimaryReplica(groupId, CLOCK.now());
assertThat(primaryMetaFuture, willCompleteSuccessfully());
ReplicaMeta primary = primaryMetaFuture.join();
PrimaryReplicaChangeCommand primaryReplicaChangeCommand = REPLICA_MESSAGES_FACTORY.primaryReplicaChangeCommand()
.leaseStartTime(primary.getStartTime().longValue())
.primaryReplicaNodeId(primary.getLeaseholderId())
.primaryReplicaNodeName(primary.getLeaseholder())
.build();
assertThat(svc.run(primaryReplicaChangeCommand), willCompleteSuccessfully());
}
}