in modules/table/src/testFixtures/java/org/apache/ignite/internal/table/impl/DummyInternalTableImpl.java [192:344]
public DummyInternalTableImpl(
ReplicaService replicaSvc,
MvPartitionStorage mvPartStorage,
@Nullable TxManager txManager,
boolean crossTableUsage,
PlacementDriver placementDriver,
SchemaDescriptor schema
) {
super(
"test",
nextTableId.getAndIncrement(),
Int2ObjectMaps.singleton(PART_ID, mock(RaftGroupService.class)),
1,
name -> mock(ClusterNode.class),
txManager == null
? new TxManagerImpl(replicaSvc, new HeapLockManager(), CLOCK, new TransactionIdGenerator(0xdeadbeef))
: txManager,
mock(MvTableStorage.class),
new TestTxStateTableStorage(),
replicaSvc,
CLOCK
);
RaftGroupService svc = raftGroupServiceByPartitionId.get(0);
groupId = crossTableUsage ? new TablePartitionId(tableId(), PART_ID) : crossTableGroupId;
lenient().doReturn(groupId).when(svc).groupId();
Peer leaderPeer = new Peer(UUID.randomUUID().toString());
lenient().doReturn(leaderPeer).when(svc).leader();
lenient().doReturn(completedFuture(new LeaderWithTerm(leaderPeer, 1L))).when(svc).refreshAndGetLeaderWithTerm();
if (!crossTableUsage) {
// Delegate replica requests directly to replica listener.
lenient().doAnswer(invocationOnMock -> replicaListener.invoke(invocationOnMock.getArgument(1)))
.when(replicaSvc).invoke(any(ClusterNode.class), any());
}
AtomicLong raftIndex = new AtomicLong();
// Delegate directly to listener.
lenient().doAnswer(
invocationClose -> {
Command cmd = invocationClose.getArgument(0);
long commandIndex = raftIndex.incrementAndGet();
CompletableFuture<Serializable> res = new CompletableFuture<>();
// All read commands are handled directly throw partition replica listener.
CommandClosure<WriteCommand> clo = new CommandClosure<>() {
/** {@inheritDoc} */
@Override
public long index() {
return commandIndex;
}
/** {@inheritDoc} */
@Override
public WriteCommand command() {
return (WriteCommand) cmd;
}
/** {@inheritDoc} */
@Override
public void result(@Nullable Serializable r) {
if (r instanceof Throwable) {
res.completeExceptionally((Throwable) r);
} else {
res.complete(r);
}
}
};
try {
partitionListener.onWrite(List.of(clo).iterator());
} catch (Throwable e) {
res.completeExceptionally(new TransactionException(e));
}
return res;
}
).when(svc).run(any());
int tableId = tableId();
int indexId = 1;
Function<BinaryRow, BinaryTuple> row2Tuple = BinaryRowConverter.keyExtractor(schema);
Lazy<TableSchemaAwareIndexStorage> pkStorage = new Lazy<>(() -> new TableSchemaAwareIndexStorage(
indexId,
new TestHashIndexStorage(PART_ID, null),
row2Tuple
));
IndexLocker pkLocker = new HashIndexLocker(indexId, true, this.txManager.lockManager(), row2Tuple);
safeTime = mock(PendingComparableValuesTracker.class);
PartitionDataStorage partitionDataStorage = new TestPartitionDataStorage(mvPartStorage);
TableIndexStoragesSupplier indexes = createTableIndexStoragesSupplier(Map.of(pkStorage.get().id(), pkStorage.get()));
GcConfiguration gcConfig = mock(GcConfiguration.class);
ConfigurationValue<Integer> gcBatchSizeValue = mock(ConfigurationValue.class);
lenient().when(gcBatchSizeValue.value()).thenReturn(5);
lenient().when(gcConfig.onUpdateBatchSize()).thenReturn(gcBatchSizeValue);
IndexUpdateHandler indexUpdateHandler = new IndexUpdateHandler(indexes);
StorageUpdateHandler storageUpdateHandler = new StorageUpdateHandler(
PART_ID,
partitionDataStorage,
gcConfig,
mock(LowWatermark.class),
indexUpdateHandler,
new GcUpdateHandler(partitionDataStorage, safeTime, indexUpdateHandler)
);
DummySchemaManagerImpl schemaManager = new DummySchemaManagerImpl(schema);
replicaListener = new PartitionReplicaListener(
mvPartStorage,
raftGroupServiceByPartitionId.get(PART_ID),
this.txManager,
this.txManager.lockManager(),
Runnable::run,
PART_ID,
tableId,
() -> Map.of(pkLocker.id(), pkLocker),
pkStorage,
Map::of,
CLOCK,
safeTime,
txStateStorage().getOrCreateTxStateStorage(PART_ID),
placementDriver,
storageUpdateHandler,
new DummySchemas(schemaManager),
completedFuture(schemaManager),
mock(ClusterNode.class),
mock(MvTableStorage.class),
mock(IndexBuilder.class),
mock(TablesConfiguration.class)
);
lenient().when(safeTime.waitFor(any())).thenReturn(completedFuture(null));
lenient().when(safeTime.current()).thenReturn(new HybridTimestamp(1, 0));
partitionListener = new PartitionListener(
new TestPartitionDataStorage(mvPartStorage),
storageUpdateHandler,
txStateStorage().getOrCreateTxStateStorage(PART_ID),
safeTime,
new PendingComparableValuesTracker<>(0L)
);
}