in modules/table/src/integrationTest/java/org/apache/ignite/internal/table/ItColocationTest.java [196:375]
static void beforeAllTests() {
ClusterNode clusterNode = DummyInternalTableImpl.LOCAL_NODE;
ClusterService clusterService = mock(ClusterService.class, RETURNS_DEEP_STUBS);
when(clusterService.messagingService()).thenReturn(mock(MessagingService.class));
when(clusterService.topologyService().localMember()).thenReturn(clusterNode);
ReplicaService replicaService = mock(ReplicaService.class, RETURNS_DEEP_STUBS);
RemotelyTriggeredResourceRegistry resourcesRegistry = new RemotelyTriggeredResourceRegistry();
PlacementDriver placementDriver = new TestPlacementDriver(clusterNode);
HybridClock clock = new HybridClockImpl();
ClockService clockService = new TestClockService(clock);
TransactionInflights transactionInflights = new TransactionInflights(placementDriver, clockService);
txManager = new TxManagerImpl(
txConfiguration,
systemDistributedConfiguration,
clusterService,
replicaService,
new HeapLockManager(systemLocalConfiguration),
clockService,
new TransactionIdGenerator(0xdeadbeef),
placementDriver,
() -> DEFAULT_IDLE_SAFE_TIME_PROPAGATION_PERIOD_MILLISECONDS,
new TestLocalRwTxCounter(),
resourcesRegistry,
transactionInflights,
new TestLowWatermark(),
commonExecutor
) {
@Override
public CompletableFuture<Void> finish(
HybridTimestampTracker observableTimestampTracker,
ReplicationGroupId commitPartition,
boolean commitIntent,
boolean timeoutExceeded,
Map<ReplicationGroupId, PendingTxPartitionEnlistment> enlistedGroups,
UUID txId
) {
return nullCompletedFuture();
}
};
assertThat(txManager.startAsync(new ComponentContext()), willCompleteSuccessfully());
Int2ObjectMap<RaftGroupService> partRafts = new Int2ObjectOpenHashMap<>();
Map<ReplicationGroupId, RaftGroupService> groupRafts = new HashMap<>();
for (int i = 0; i < PARTS; ++i) {
RaftGroupService r = mock(RaftGroupService.class);
final int part = i;
doAnswer(invocation -> {
Command cmd = (Command) invocation.getArguments()[0];
CMDS_MAP.merge(part, new HashSet<>(Set.of(cmd)), (newSet, set) -> {
set.addAll(newSet);
return set;
});
if (cmd instanceof UpdateAllCommand) {
return completedFuture(((UpdateAllCommand) cmd).rowsToUpdate().keySet().stream()
.map(uuid -> new NullBinaryRow())
.collect(Collectors.toList()));
} else {
return trueCompletedFuture();
}
}).when(r).run(any());
partRafts.put(i, r);
groupRafts.put(enabledColocation() ? new ZonePartitionId(ZONE_ID, i) : new TablePartitionId(TABLE_ID, i), r);
}
Answer<CompletableFuture<?>> clo = invocation -> {
String nodeName = invocation.getArgument(0);
ClusterNode node = clusterNodeByName(nodeName);
ReplicaRequest request = invocation.getArgument(1);
ReplicationGroupIdMessage commitPartId = enabledColocation()
? toZonePartitionIdMessage(REPLICA_MESSAGES_FACTORY, new ZonePartitionId(ZONE_ID, 0)) :
toTablePartitionIdMessage(REPLICA_MESSAGES_FACTORY, new TablePartitionId(TABLE_ID, 0));
RaftGroupService r = groupRafts.get(request.groupId().asReplicationGroupId());
if (request instanceof ReadWriteMultiRowReplicaRequest) {
ReadWriteMultiRowReplicaRequest multiRowReplicaRequest = (ReadWriteMultiRowReplicaRequest) request;
Map<UUID, TimedBinaryRowMessage> rows = multiRowReplicaRequest.binaryTuples().stream()
.collect(
toMap(tupleBuffer -> TestTransactionIds.newTransactionId(),
tupleBuffer -> PARTITION_REPLICATION_MESSAGES_FACTORY.timedBinaryRowMessage()
.binaryRowMessage(binaryRowMessage(tupleBuffer, multiRowReplicaRequest))
.build())
);
return r.run(PARTITION_REPLICATION_MESSAGES_FACTORY.updateAllCommand()
.tableId(TABLE_ID)
.commitPartitionId(commitPartId)
.messageRowsToUpdate(rows)
.txId(UUID.randomUUID())
.txCoordinatorId(node.id())
.initiatorTime(clock.now())
.build());
} else {
assertThat(request, is(instanceOf(ReadWriteSingleRowReplicaRequest.class)));
ReadWriteSingleRowReplicaRequest singleRowReplicaRequest = (ReadWriteSingleRowReplicaRequest) request;
return r.run(PARTITION_REPLICATION_MESSAGES_FACTORY.updateCommand()
.tableId(TABLE_ID)
.commitPartitionId(commitPartId)
.rowUuid(UUID.randomUUID())
.messageRowToUpdate(PARTITION_REPLICATION_MESSAGES_FACTORY.timedBinaryRowMessage()
.binaryRowMessage(binaryRowMessage(singleRowReplicaRequest.binaryTuple(), singleRowReplicaRequest))
.build())
.txId(TestTransactionIds.newTransactionId())
.txCoordinatorId(node.id())
.initiatorTime(clock.now())
.build());
}
};
when(replicaService.invoke(any(String.class), any())).thenAnswer(clo);
when(replicaService.invokeRaw(any(String.class), any())).thenAnswer(
invocation -> clo.answer(invocation).thenApply(res -> new TimestampAwareReplicaResponse() {
@Override
public @Nullable Object result() {
return res;
}
@Override
public @Nullable HybridTimestamp timestamp() {
return clock.now();
}
@Override
public MessageSerializer<NetworkMessage> serializer() {
return null;
}
@Override
public short messageType() {
return 0;
}
@Override
public short groupType() {
return 0;
}
@Override
public NetworkMessage clone() {
return null;
}
}));
intTable = new InternalTableImpl(
QualifiedNameHelper.fromNormalized(SqlCommon.DEFAULT_SCHEMA_NAME, "TEST"),
ZONE_ID, // zone id.
TABLE_ID, // table id.
PARTS, // number of partitions.
new SingleClusterNodeResolver(clusterNode),
txManager,
mock(MvTableStorage.class),
new TestTxStateStorage(),
replicaService,
clockService,
observableTimestampTracker,
new TestPlacementDriver(clusterNode),
transactionInflights,
null,
mock(StreamerReceiverRunner.class),
() -> 10_000L,
() -> 10_000L
);
}