in modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/fixtures/Node.java [308:825]
public Node(
TestInfo testInfo,
NetworkAddress address,
NodeFinder nodeFinder,
Path workDir,
SystemLocalConfiguration systemLocalConfiguration,
RaftConfiguration raftConfiguration,
NodeAttributesConfiguration nodeAttributesConfiguration,
StorageConfiguration storageConfiguration,
SystemDistributedConfiguration systemConfiguration,
ReplicationConfiguration replicationConfiguration,
TransactionConfiguration transactionConfiguration,
ScheduledExecutorService scheduledExecutorService,
@Nullable InvokeInterceptor invokeInterceptor,
GcConfiguration gcConfiguration,
SqlLocalConfiguration sqlLocalConfiguration,
SqlDistributedConfiguration sqlDistributedConfiguration
) {
this.invokeInterceptor = invokeInterceptor;
name = testNodeName(testInfo, address.port());
Path dir = workDir.resolve(name);
vaultManager = createVault(dir);
nodeCfgGenerator = new ConfigurationTreeGenerator(
List.of(NodeConfiguration.KEY),
List.of(
NetworkExtensionConfigurationSchema.class,
StorageExtensionConfigurationSchema.class,
SystemLocalExtensionConfigurationSchema.class,
PersistentPageMemoryStorageEngineExtensionConfigurationSchema.class,
VolatilePageMemoryStorageEngineExtensionConfigurationSchema.class
),
List.of(
PersistentPageMemoryProfileConfigurationSchema.class,
VolatilePageMemoryProfileConfigurationSchema.class,
StaticNodeFinderConfigurationSchema.class,
MulticastNodeFinderConfigurationSchema.class
)
);
Path configPath = dir.resolve("config");
TestIgnitionManager.writeConfigurationFileApplyingTestDefaults(configPath);
nodeCfgMgr = new ConfigurationManager(
List.of(NodeConfiguration.KEY),
new LocalFileConfigurationStorage(configPath, nodeCfgGenerator, null),
nodeCfgGenerator,
new TestConfigurationValidator()
);
var clusterIdHolder = new ClusterIdHolder();
clusterService = ClusterServiceTestUtils.clusterService(
testInfo,
address.port(),
nodeFinder,
new InMemoryStaleIds(),
clusterIdHolder
);
lockManager = HeapLockManager.smallInstance();
var raftGroupEventsClientListener = new RaftGroupEventsClientListener();
ComponentWorkingDir partitionsWorkDir = partitionsPath(systemLocalConfiguration, dir);
partitionsLogStorageFactory = SharedLogStorageFactoryUtils.create(clusterService.nodeName(), partitionsWorkDir.raftLogPath());
RaftGroupOptionsConfigurer partitionRaftConfigurer =
RaftGroupOptionsConfigHelper.configureProperties(partitionsLogStorageFactory, partitionsWorkDir.metaPath());
raftManager = new Loza(
clusterService,
new NoOpMetricManager(),
raftConfiguration,
hybridClock,
raftGroupEventsClientListener,
new NoOpFailureManager()
);
failureManager = new NoOpFailureManager();
var clusterStateStorage = new TestClusterStateStorage();
var logicalTopology = new LogicalTopologyImpl(clusterStateStorage, failureManager);
var clusterInitializer = new ClusterInitializer(
clusterService,
hocon -> hocon,
new TestConfigurationValidator()
);
ComponentWorkingDir cmgWorkDir = new ComponentWorkingDir(dir.resolve("cmg"));
cmgLogStorageFactory =
SharedLogStorageFactoryUtils.create(clusterService.nodeName(), cmgWorkDir.raftLogPath());
RaftGroupOptionsConfigurer cmgRaftConfigurer =
RaftGroupOptionsConfigHelper.configureProperties(cmgLogStorageFactory, cmgWorkDir.metaPath());
cmgManager = new ClusterManagementGroupManager(
vaultManager,
new SystemDisasterRecoveryStorage(vaultManager),
clusterService,
clusterInitializer,
raftManager,
clusterStateStorage,
logicalTopology,
new NodeAttributesCollector(nodeAttributesConfiguration, storageConfiguration),
failureManager,
clusterIdHolder,
cmgRaftConfigurer
);
LogicalTopologyServiceImpl logicalTopologyService = new LogicalTopologyServiceImpl(logicalTopology, cmgManager);
var readOperationForCompactionTracker = new ReadOperationForCompactionTracker();
var keyValueStorage = new RocksDbKeyValueStorage(
name,
resolveDir(dir, "metaStorageTestKeyValue"),
failureManager,
readOperationForCompactionTracker,
scheduledExecutorService
);
var topologyAwareRaftGroupServiceFactory = new TopologyAwareRaftGroupServiceFactory(
clusterService,
logicalTopologyService,
Loza.FACTORY,
raftGroupEventsClientListener
);
ComponentWorkingDir metastorageWorkDir = new ComponentWorkingDir(dir.resolve("metastorage"));
msLogStorageFactory =
SharedLogStorageFactoryUtils.create(clusterService.nodeName(), metastorageWorkDir.raftLogPath());
RaftGroupOptionsConfigurer msRaftConfigurer =
RaftGroupOptionsConfigHelper.configureProperties(msLogStorageFactory, metastorageWorkDir.metaPath());
metaStorageManager = new MetaStorageManagerImpl(
clusterService,
cmgManager,
logicalTopologyService,
raftManager,
keyValueStorage,
hybridClock,
topologyAwareRaftGroupServiceFactory,
new NoOpMetricManager(),
systemConfiguration,
msRaftConfigurer,
readOperationForCompactionTracker
) {
@Override
public CompletableFuture<Boolean> invoke(
Condition condition,
List<Operation> success,
List<Operation> failure
) {
InvokeInterceptor invokeInterceptor = Node.this.invokeInterceptor;
if (invokeInterceptor != null) {
Boolean res = invokeInterceptor.invoke(condition, success, failure);
if (res != null) {
return completedFuture(res);
}
}
return super.invoke(condition, success, failure);
}
};
threadPoolsManager = new ThreadPoolsManager(name);
LongSupplier partitionIdleSafeTimePropagationPeriodMsSupplier = () -> 10L;
ReplicaService replicaSvc = new ReplicaService(
clusterService.messagingService(),
hybridClock,
threadPoolsManager.partitionOperationsExecutor(),
replicationConfiguration,
threadPoolsManager.commonScheduler()
);
resourcesRegistry = new RemotelyTriggeredResourceRegistry();
clockWaiter = new ClockWaiter(name, hybridClock, threadPoolsManager.commonScheduler());
var clockService = new ClockServiceImpl(
hybridClock,
clockWaiter,
() -> TestIgnitionManager.DEFAULT_MAX_CLOCK_SKEW_MS
);
placementDriverManager = new PlacementDriverManager(
name,
metaStorageManager,
MetastorageGroupId.INSTANCE,
clusterService,
cmgManager::metaStorageNodes,
logicalTopologyService,
raftManager,
topologyAwareRaftGroupServiceFactory,
clockService,
failureManager,
replicationConfiguration
);
var transactionInflights = new TransactionInflights(placementDriverManager.placementDriver(), clockService);
var cfgStorage = new DistributedConfigurationStorage("test", metaStorageManager);
clusterCfgGenerator = new ConfigurationTreeGenerator(
List.of(ClusterConfiguration.KEY),
List.of(
GcExtensionConfigurationSchema.class,
ReplicationExtensionConfigurationSchema.class,
SystemDistributedExtensionConfigurationSchema.class
),
List.of()
);
clusterCfgMgr = new ConfigurationManager(
List.of(ClusterConfiguration.KEY),
cfgStorage,
clusterCfgGenerator,
new TestConfigurationValidator()
);
ConfigurationRegistry clusterConfigRegistry = clusterCfgMgr.configurationRegistry();
var registry = new MetaStorageRevisionListenerRegistry(metaStorageManager);
DataStorageModules dataStorageModules = new DataStorageModules(List.of(
new PersistentPageMemoryDataStorageModule(),
new NonVolatileTestDataStorageModule(),
new VolatilePageMemoryDataStorageModule()
));
Path storagePath = dir.resolve("storage");
dataStorageMgr = new DataStorageManager(
dataStorageModules.createStorageEngines(
name,
new NoOpMetricManager(),
nodeCfgMgr.configurationRegistry(),
dir.resolve("storage"),
null,
failureManager,
partitionsLogStorageFactory,
hybridClock,
scheduledExecutorService
),
storageConfiguration
);
lowWatermark = new LowWatermarkImpl(
name,
gcConfiguration.lowWatermark(),
clockService,
vaultManager,
failureManager,
clusterService.messagingService()
);
txManager = new TxManagerImpl(
transactionConfiguration,
systemConfiguration,
clusterService,
replicaSvc,
lockManager,
clockService,
new TransactionIdGenerator(address.port()),
placementDriverManager.placementDriver(),
partitionIdleSafeTimePropagationPeriodMsSupplier,
new TestLocalRwTxCounter(),
resourcesRegistry,
transactionInflights,
lowWatermark,
threadPoolsManager.commonScheduler()
);
volatileLogStorageFactoryCreator = new VolatileLogStorageFactoryCreator(name, workDir.resolve("volatile-log-spillout-" + name));
replicaManager = new ReplicaManager(
name,
clusterService,
cmgManager,
clockService,
Set.of(PartitionReplicationMessageGroup.class, TxMessageGroup.class),
placementDriverManager.placementDriver(),
threadPoolsManager.partitionOperationsExecutor(),
partitionIdleSafeTimePropagationPeriodMsSupplier,
new NoOpFailureManager(),
new ThreadLocalPartitionCommandsMarshaller(clusterService.serializationRegistry()),
topologyAwareRaftGroupServiceFactory,
raftManager,
partitionRaftConfigurer,
volatileLogStorageFactoryCreator,
threadPoolsManager.tableIoExecutor(),
replicaGrpId -> metaStorageManager.get(pendingPartAssignmentsQueueKey((ZonePartitionId) replicaGrpId))
.thenApply(Entry::value)
);
LongSupplier delayDurationMsSupplier = () -> DELAY_DURATION_MS;
catalogManager = new CatalogManagerImpl(
new UpdateLogImpl(metaStorageManager, failureManager),
clockService,
failureManager,
delayDurationMsSupplier
);
raftManager.appendEntriesRequestInterceptor(new CheckCatalogVersionOnAppendEntries(catalogManager));
raftManager.actionRequestInterceptor(new CheckCatalogVersionOnActionRequest(catalogManager));
indexMetaStorage = new IndexMetaStorage(catalogManager, lowWatermark, metaStorageManager);
schemaManager = new SchemaManager(registry, catalogManager);
schemaSyncService = new SchemaSyncServiceImpl(metaStorageManager.clusterTime(), delayDurationMsSupplier);
MinimumRequiredTimeCollectorService minTimeCollectorService = new MinimumRequiredTimeCollectorServiceImpl();
catalogCompactionRunner = new CatalogCompactionRunner(
name,
(CatalogManagerImpl) catalogManager,
clusterService.messagingService(),
logicalTopologyService,
placementDriverManager.placementDriver(),
replicaSvc,
clockService,
schemaSyncService,
clusterService.topologyService(),
clockService::nowLong,
minTimeCollectorService,
new RebalanceMinimumRequiredTimeProviderImpl(metaStorageManager, catalogManager));
((MetaStorageManagerImpl) metaStorageManager).addElectionListener(catalogCompactionRunner::updateCoordinator);
lowWatermark.listen(LowWatermarkEvent.LOW_WATERMARK_CHANGED,
params -> catalogCompactionRunner.onLowWatermarkChanged(((ChangeLowWatermarkEventParameters) params).newLowWatermark()));
ScheduledExecutorService rebalanceScheduler = Executors.newScheduledThreadPool(
REBALANCE_SCHEDULER_POOL_SIZE,
NamedThreadFactory.create(name, "test-rebalance-scheduler", LOG)
);
SystemDistributedConfiguration systemDistributedConfiguration =
clusterConfigRegistry.getConfiguration(SystemDistributedExtensionConfiguration.KEY).system();
distributionZoneManager = new DistributionZoneManager(
name,
registry,
metaStorageManager,
logicalTopologyService,
catalogManager,
systemDistributedConfiguration,
clockService
);
sharedTxStateStorage = new TxStateRocksDbSharedStorage(
storagePath.resolve("tx-state"),
threadPoolsManager.commonScheduler(),
threadPoolsManager.tableIoExecutor(),
partitionsLogStorageFactory,
failureManager
);
outgoingSnapshotsManager = new OutgoingSnapshotsManager(name, clusterService.messagingService(), failureManager);
partitionReplicaLifecycleManager = new PartitionReplicaLifecycleManager(
catalogManager,
replicaManager,
distributionZoneManager,
metaStorageManager,
clusterService.topologyService(),
lowWatermark,
failureManager,
threadPoolsManager.tableIoExecutor(),
rebalanceScheduler,
threadPoolsManager.partitionOperationsExecutor(),
clockService,
placementDriverManager.placementDriver(),
schemaSyncService,
systemDistributedConfiguration,
sharedTxStateStorage,
txManager,
schemaManager,
dataStorageMgr,
outgoingSnapshotsManager
);
resourceVacuumManager = new ResourceVacuumManager(
name,
resourcesRegistry,
clusterService.topologyService(),
clusterService.messagingService(),
transactionInflights,
txManager,
lowWatermark,
failureManager
);
tableManager = new TableManager(
name,
registry,
gcConfiguration,
transactionConfiguration,
replicationConfiguration,
clusterService.messagingService(),
clusterService.topologyService(),
clusterService.serializationRegistry(),
replicaManager,
lockManager,
replicaSvc,
txManager,
dataStorageMgr,
sharedTxStateStorage,
metaStorageManager,
schemaManager,
threadPoolsManager.tableIoExecutor(),
threadPoolsManager.partitionOperationsExecutor(),
rebalanceScheduler,
threadPoolsManager.commonScheduler(),
clockService,
outgoingSnapshotsManager,
distributionZoneManager,
schemaSyncService,
catalogManager,
failureManager,
observableTimestampTracker,
placementDriverManager.placementDriver(),
() -> mock(IgniteSql.class),
resourcesRegistry,
lowWatermark,
transactionInflights,
indexMetaStorage,
partitionsLogStorageFactory,
partitionReplicaLifecycleManager,
minTimeCollectorService,
systemDistributedConfiguration
) {
@Override
protected MvTableStorage createTableStorage(CatalogTableDescriptor tableDescriptor, CatalogZoneDescriptor zoneDescriptor) {
MvTableStorage storage = super.createTableStorage(tableDescriptor, zoneDescriptor);
return isMock(storage) ? storage : spy(storage);
}
@Override
protected TxStateStorage createTxStateTableStorage(
CatalogTableDescriptor tableDescriptor,
CatalogZoneDescriptor zoneDescriptor
) {
TxStateStorage storage = super.createTxStateTableStorage(tableDescriptor, zoneDescriptor);
return isMock(storage) ? storage : spy(storage);
}
};
tableManager.setStreamerReceiverRunner(mock(StreamerReceiverRunner.class));
indexManager = new IndexManager(
schemaManager,
tableManager,
catalogManager,
threadPoolsManager.tableIoExecutor(),
registry,
lowWatermark
);
indexBuildingManager = new IndexBuildingManager(
name,
replicaSvc,
catalogManager,
metaStorageManager,
indexManager,
indexMetaStorage,
placementDriverManager.placementDriver(),
clusterService,
logicalTopologyService,
clockService,
failureManager,
lowWatermark
);
systemViewManager = new SystemViewManagerImpl(name, catalogManager, failureManager);
sqlQueryProcessor = new SqlQueryProcessor(
clusterService,
logicalTopologyService,
tableManager,
schemaManager,
dataStorageMgr,
replicaSvc,
clockService,
schemaSyncService,
catalogManager,
new NoOpMetricManager(),
systemViewManager,
failureManager,
placementDriverManager.placementDriver(),
sqlDistributedConfiguration,
sqlLocalConfiguration,
transactionInflights,
txManager,
lowWatermark,
threadPoolsManager.commonScheduler(),
new KillCommandHandler(name, logicalTopologyService, clusterService.messagingService()),
mock(EventLog.class)
);
}