in modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxDistributedTestSingleNodeNoCleanupMessage.java [111:242]
public void before() throws Exception {
txTestCluster = new ItTxTestCluster(
testInfo,
raftConfiguration,
txConfiguration,
systemDistributedConfiguration,
workDir,
nodes(),
replicas(),
startClient(),
timestampTracker,
replicationConfiguration
) {
@Override
protected TxManagerImpl newTxManager(
ClusterService clusterService,
ReplicaService replicaSvc,
ClockService clockService,
TransactionIdGenerator generator,
ClusterNode node,
PlacementDriver placementDriver,
RemotelyTriggeredResourceRegistry resourcesRegistry,
TransactionInflights transactionInflights,
LowWatermark lowWatermark
) {
return new TxManagerImpl(
txConfiguration,
systemDistributedConfiguration,
clusterService,
replicaSvc,
new HeapLockManager(systemLocalConfiguration),
clockService,
generator,
placementDriver,
() -> DEFAULT_IDLE_SAFE_TIME_PROPAGATION_PERIOD_MILLISECONDS,
new TestLocalRwTxCounter(),
resourcesRegistry,
transactionInflights,
lowWatermark,
commonExecutor
) {
@Override
public CompletableFuture<Void> executeWriteIntentSwitchAsync(Runnable runnable) {
CompletableFuture<Void> cleanupFuture = super.executeWriteIntentSwitchAsync(runnable);
cleanupFutures.add(cleanupFuture);
return cleanupFuture;
}
};
}
@Override
protected PartitionReplicaListener newReplicaListener(
MvPartitionStorage mvDataStorage,
RaftGroupService raftClient,
TxManager txManager,
Executor scanRequestExecutor,
PartitionGroupId replicationGroupId,
int tableId,
Supplier<Map<Integer, IndexLocker>> indexesLockers,
Lazy<TableSchemaAwareIndexStorage> pkIndexStorage,
Supplier<Map<Integer, TableSchemaAwareIndexStorage>> secondaryIndexStorages,
ClockService clockService,
PendingComparableValuesTracker<HybridTimestamp, Void> safeTime,
TxStatePartitionStorage txStatePartitionStorage,
TransactionStateResolver transactionStateResolver,
StorageUpdateHandler storageUpdateHandler,
ValidationSchemasSource validationSchemasSource,
ClusterNode localNode,
SchemaSyncService schemaSyncService,
CatalogService catalogService,
PlacementDriver placementDriver,
ClusterNodeResolver clusterNodeResolver,
RemotelyTriggeredResourceRegistry resourcesRegistry,
SchemaRegistry schemaRegistry
) {
return new PartitionReplicaListener(
mvDataStorage,
raftClient,
txManager,
txManager.lockManager(),
Runnable::run,
replicationGroupId,
tableId,
indexesLockers,
pkIndexStorage,
secondaryIndexStorages,
clockService,
safeTime,
txStatePartitionStorage,
transactionStateResolver,
storageUpdateHandler,
validationSchemasSource,
localNode,
schemaSyncService,
catalogService,
placementDriver,
clusterNodeResolver,
resourcesRegistry,
schemaRegistry,
mock(IndexMetaStorage.class),
lowWatermark,
mock(FailureProcessor.class)
) {
@Override
public CompletableFuture<ReplicaResult> invoke(ReplicaRequest request, UUID senderId) {
if (request instanceof WriteIntentSwitchReplicaRequest) {
logger().info("Dropping cleanup request: {}", request);
releaseTxLocks(
((WriteIntentSwitchReplicaRequest) request).txId(),
txManager.lockManager()
);
return completedFuture(new ReplicaResult(null, null));
}
return super.invoke(request, senderId);
}
};
}
};
txTestCluster.prepareCluster();
this.igniteTransactions = txTestCluster.igniteTransactions;
accounts = txTestCluster.startTable(ACC_TABLE_NAME, ACCOUNTS_SCHEMA);
customers = txTestCluster.startTable(CUST_TABLE_NAME, CUSTOMERS_SCHEMA);
log.info("Tables have been started");
}