in modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java [340:915]
private PartialNode startPartialNode(
int idx,
@Nullable @Language("HOCON") String cfgString
) {
String name = testNodeName(testInfo, idx);
Path dir = workDir.resolve(name);
List<IgniteComponent> components = new ArrayList<>();
VaultManager vault = createVault(dir);
var clusterStateStorage = new RocksDbClusterStateStorage(dir.resolve("cmg"), name);
var clusterIdService = new ClusterIdService(vault);
ConfigurationModules modules = loadConfigurationModules(log, Thread.currentThread().getContextClassLoader());
Path configFile = workDir.resolve(TestIgnitionManager.DEFAULT_CONFIG_NAME);
String configString = cfgString == null ? configurationString(idx) : cfgString;
try {
Files.writeString(configFile, configString);
} catch (IOException e) {
throw new NodeConfigWriteException("Failed to write config content to file.", e);
}
var localConfigurationGenerator = new ConfigurationTreeGenerator(
modules.local().rootKeys(),
modules.local().schemaExtensions(),
modules.local().polymorphicSchemaExtensions()
);
var nodeCfgMgr = new ConfigurationManager(
modules.local().rootKeys(),
new LocalFileConfigurationStorage(configFile, localConfigurationGenerator, modules.local()),
localConfigurationGenerator,
ConfigurationValidatorImpl.withDefaultValidators(localConfigurationGenerator, modules.local().validators())
);
NetworkConfiguration networkConfiguration = nodeCfgMgr.configurationRegistry()
.getConfiguration(NetworkExtensionConfiguration.KEY).network();
var threadPoolsManager = new ThreadPoolsManager(name);
var failureProcessor = new NoOpFailureManager();
var workerRegistry = new CriticalWorkerWatchdog(workersConfiguration, threadPoolsManager.commonScheduler(), failureProcessor);
var nettyBootstrapFactory = new NettyBootstrapFactory(networkConfiguration, name);
var nettyWorkersRegistrar = new NettyWorkersRegistrar(
workerRegistry,
threadPoolsManager.commonScheduler(),
nettyBootstrapFactory,
workersConfiguration,
failureProcessor
);
var clusterSvc = new TestScaleCubeClusterServiceFactory().createClusterService(
name,
networkConfiguration,
nettyBootstrapFactory,
defaultSerializationRegistry(),
new VaultStaleIds(vault),
clusterIdService,
workerRegistry,
failureProcessor,
defaultChannelTypeRegistry(),
new DefaultIgniteProductVersionSource()
);
var hybridClock = new HybridClockImpl();
var raftGroupEventsClientListener = new RaftGroupEventsClientListener();
ComponentWorkingDir partitionsWorkDir = partitionsPath(systemConfiguration, dir);
LogStorageFactory partitionsLogStorageFactory =
SharedLogStorageFactoryUtils.create(clusterSvc.nodeName(), partitionsWorkDir.raftLogPath());
RaftGroupOptionsConfigurer partitionRaftConfigurer =
RaftGroupOptionsConfigHelper.configureProperties(partitionsLogStorageFactory, partitionsWorkDir.metaPath());
var raftMgr = TestLozaFactory.create(
clusterSvc,
raftConfiguration,
hybridClock,
raftGroupEventsClientListener
);
var logicalTopology = new LogicalTopologyImpl(clusterStateStorage, failureProcessor);
var clusterInitializer = new ClusterInitializer(
clusterSvc,
hocon -> hocon,
new TestConfigurationValidator()
);
ComponentWorkingDir cmgWorkDir = new ComponentWorkingDir(workDir.resolve("cmg"));
LogStorageFactory cmgLogStorageFactory =
SharedLogStorageFactoryUtils.create(clusterSvc.nodeName(), cmgWorkDir.raftLogPath());
RaftGroupOptionsConfigurer cmgRaftConfigurer =
RaftGroupOptionsConfigHelper.configureProperties(cmgLogStorageFactory, cmgWorkDir.metaPath());
StorageConfiguration storageConfiguration = nodeCfgMgr.configurationRegistry()
.getConfiguration(StorageExtensionConfiguration.KEY).storage();
var cmgManager = new ClusterManagementGroupManager(
vault,
new SystemDisasterRecoveryStorage(vault),
clusterSvc,
clusterInitializer,
raftMgr,
clusterStateStorage,
logicalTopology,
new NodeAttributesCollector(nodeAttributes, storageConfiguration),
failureProcessor,
clusterIdService,
cmgRaftConfigurer
);
LongSupplier partitionIdleSafeTimePropagationPeriodMsSupplier
= () -> TestIgnitionManager.DEFAULT_PARTITION_IDLE_SYNC_TIME_INTERVAL_MS;
MessagingService messagingServiceReturningToStorageOperationsPool = new JumpToExecutorByConsistentIdAfterSend(
clusterSvc.messagingService(),
name,
message -> threadPoolsManager.partitionOperationsExecutor()
);
var replicaService = new ReplicaService(
messagingServiceReturningToStorageOperationsPool,
hybridClock,
threadPoolsManager.partitionOperationsExecutor(),
replicationConfiguration,
threadPoolsManager.commonScheduler()
);
var lockManager = new HeapLockManager(systemConfiguration);
var logicalTopologyService = new LogicalTopologyServiceImpl(logicalTopology, cmgManager);
var metricManager = new MetricManagerImpl();
var topologyAwareRaftGroupServiceFactory = new TopologyAwareRaftGroupServiceFactory(
clusterSvc,
logicalTopologyService,
Loza.FACTORY,
raftGroupEventsClientListener
);
var readOperationForCompactionTracker = new ReadOperationForCompactionTracker();
var metaStorage = new RocksDbKeyValueStorage(
name,
dir.resolve("metastorage"),
new NoOpFailureManager(),
readOperationForCompactionTracker,
scheduledExecutorService
);
InvokeInterceptor metaStorageInvokeInterceptor = metaStorageInvokeInterceptorByNode.get(idx);
CompletableFuture<LongSupplier> maxClockSkewFuture = new CompletableFuture<>();
ComponentWorkingDir metastorageWorkDir = new ComponentWorkingDir(workDir.resolve("metastorage"));
LogStorageFactory msLogStorageFactory =
SharedLogStorageFactoryUtils.create(clusterSvc.nodeName(), metastorageWorkDir.raftLogPath());
RaftGroupOptionsConfigurer msRaftConfigurer =
RaftGroupOptionsConfigHelper.configureProperties(msLogStorageFactory, metastorageWorkDir.metaPath());
var metaStorageMgr = new MetaStorageManagerImpl(
clusterSvc,
cmgManager,
logicalTopologyService,
raftMgr,
metaStorage,
hybridClock,
topologyAwareRaftGroupServiceFactory,
metricManager,
systemDistributedConfiguration,
msRaftConfigurer,
readOperationForCompactionTracker
) {
@Override
public CompletableFuture<Boolean> invoke(Condition condition, List<Operation> success, List<Operation> failure) {
if (metaStorageInvokeInterceptor != null) {
var res = metaStorageInvokeInterceptor.invoke(condition, success, failure);
if (res != null) {
return completedFuture(res);
}
}
return super.invoke(condition, success, failure);
}
};
var cfgStorage = new DistributedConfigurationStorage("test", metaStorageMgr);
ConfigurationTreeGenerator distributedConfigurationGenerator = new ConfigurationTreeGenerator(
modules.distributed().rootKeys(),
modules.distributed().schemaExtensions(),
modules.distributed().polymorphicSchemaExtensions()
);
var clusterCfgMgr = new ConfigurationManager(
modules.distributed().rootKeys(),
cfgStorage,
distributedConfigurationGenerator,
ConfigurationValidatorImpl.withDefaultValidators(distributedConfigurationGenerator, modules.distributed().validators())
);
ConfigurationRegistry clusterConfigRegistry = clusterCfgMgr.configurationRegistry();
var clockWaiter = new ClockWaiter(name, hybridClock, scheduledExecutorService);
SchemaSynchronizationConfiguration schemaSyncConfiguration = clusterConfigRegistry
.getConfiguration(SchemaSynchronizationExtensionConfiguration.KEY).schemaSync();
ClockService clockService = new ClockServiceImpl(
hybridClock,
clockWaiter,
() -> schemaSyncConfiguration.maxClockSkewMillis().value()
);
maxClockSkewFuture.complete(clockService::maxClockSkewMillis);
var placementDriverManager = new PlacementDriverManager(
name,
metaStorageMgr,
MetastorageGroupId.INSTANCE,
clusterSvc,
cmgManager::metaStorageNodes,
logicalTopologyService,
raftMgr,
topologyAwareRaftGroupServiceFactory,
clockService,
failureProcessor,
clusterConfigRegistry.getConfiguration(ReplicationExtensionConfiguration.KEY).replication()
);
ScheduledExecutorService rebalanceScheduler = new ScheduledThreadPoolExecutor(REBALANCE_SCHEDULER_POOL_SIZE,
NamedThreadFactory.create(name, "test-rebalance-scheduler", logger()));
ReplicaManager replicaMgr = new ReplicaManager(
name,
clusterSvc,
cmgManager,
clockService,
Set.of(PartitionReplicationMessageGroup.class, TxMessageGroup.class),
placementDriverManager.placementDriver(),
threadPoolsManager.partitionOperationsExecutor(),
partitionIdleSafeTimePropagationPeriodMsSupplier,
failureProcessor,
new ThreadLocalPartitionCommandsMarshaller(clusterSvc.serializationRegistry()),
topologyAwareRaftGroupServiceFactory,
raftMgr,
partitionRaftConfigurer,
view -> new LocalLogStorageFactory(),
threadPoolsManager.tableIoExecutor(),
replicaGrpId -> metaStorageMgr.get(pendingPartAssignmentsQueueKey((TablePartitionId) replicaGrpId))
.thenApply(Entry::value)
);
var resourcesRegistry = new RemotelyTriggeredResourceRegistry();
GcConfiguration gcConfig = clusterConfigRegistry.getConfiguration(GcExtensionConfiguration.KEY).gc();
var lowWatermark = new LowWatermarkImpl(
name,
gcConfig.lowWatermark(),
clockService,
vault,
failureProcessor,
clusterSvc.messagingService()
);
TransactionInflights transactionInflights = new TransactionInflights(placementDriverManager.placementDriver(), clockService);
var txManager = new TxManagerImpl(
name,
txConfiguration,
systemDistributedConfiguration,
messagingServiceReturningToStorageOperationsPool,
clusterSvc.topologyService(),
replicaService,
lockManager,
clockService,
new TransactionIdGenerator(idx),
placementDriverManager.placementDriver(),
partitionIdleSafeTimePropagationPeriodMsSupplier,
new TestLocalRwTxCounter(),
threadPoolsManager.partitionOperationsExecutor(),
resourcesRegistry,
transactionInflights,
lowWatermark,
threadPoolsManager.commonScheduler(),
failureProcessor
);
ResourceVacuumManager resourceVacuumManager = new ResourceVacuumManager(
name,
resourcesRegistry,
clusterSvc.topologyService(),
clusterSvc.messagingService(),
transactionInflights,
txManager,
lowWatermark,
failureProcessor
);
var registry = new MetaStorageRevisionListenerRegistry(metaStorageMgr);
DataStorageModules dataStorageModules = new DataStorageModules(
ServiceLoader.load(DataStorageModule.class)
);
Path storagePath = getPartitionsStorePath(dir);
DataStorageManager dataStorageManager = new DataStorageManager(
dataStorageModules.createStorageEngines(
name,
new NoOpMetricManager(),
nodeCfgMgr.configurationRegistry(),
storagePath,
null,
failureProcessor,
partitionsLogStorageFactory,
hybridClock,
scheduledExecutorService
),
storageConfiguration
);
TransactionConfiguration txConfiguration = clusterConfigRegistry
.getConfiguration(TransactionExtensionConfiguration.KEY).transaction();
LongSupplier delayDurationMsSupplier = () -> TestIgnitionManager.DEFAULT_DELAY_DURATION_MS;
var catalogManager = new CatalogManagerImpl(
new UpdateLogImpl(metaStorageMgr, failureProcessor),
clockService,
failureProcessor,
delayDurationMsSupplier
);
var indexMetaStorage = new IndexMetaStorage(catalogManager, lowWatermark, metaStorageMgr);
SchemaManager schemaManager = new SchemaManager(registry, catalogManager);
var dataNodesMock = dataNodesMockByNode.get(idx);
SystemDistributedConfiguration systemDistributedConfiguration =
clusterConfigRegistry.getConfiguration(SystemDistributedExtensionConfiguration.KEY).system();
DistributionZoneManager distributionZoneManager = new DistributionZoneManager(
name,
registry,
metaStorageMgr,
logicalTopologyService,
catalogManager,
systemDistributedConfiguration,
clockService
) {
@Override
public CompletableFuture<Set<String>> dataNodes(HybridTimestamp timestamp, int catalogVersion, int zoneId) {
if (dataNodesMock != null) {
return dataNodesMock.get();
}
return super.dataNodes(timestamp, catalogVersion, zoneId);
}
};
var schemaSyncService = new SchemaSyncServiceImpl(metaStorageMgr.clusterTime(), delayDurationMsSupplier);
var sqlRef = new AtomicReference<IgniteSqlImpl>();
MinimumRequiredTimeCollectorService minTimeCollectorService = new MinimumRequiredTimeCollectorServiceImpl();
var sharedTxStateStorage = new TxStateRocksDbSharedStorage(
storagePath.resolve("tx-state"),
threadPoolsManager.commonScheduler(),
threadPoolsManager.tableIoExecutor(),
partitionsLogStorageFactory,
failureProcessor
);
var outgoingSnapshotManager = new OutgoingSnapshotsManager(name, clusterSvc.messagingService(), failureProcessor);
var partitionReplicaLifecycleListener = new PartitionReplicaLifecycleManager(
catalogManager,
replicaMgr,
distributionZoneManager,
metaStorageMgr,
clusterSvc.topologyService(),
lowWatermark,
failureProcessor,
threadPoolsManager.tableIoExecutor(),
rebalanceScheduler,
threadPoolsManager.partitionOperationsExecutor(),
clockService,
placementDriverManager.placementDriver(),
schemaSyncService,
systemDistributedConfiguration,
sharedTxStateStorage,
txManager,
schemaManager,
dataStorageManager,
outgoingSnapshotManager
);
TableManager tableManager = new TableManager(
name,
registry,
gcConfig,
txConfiguration,
replicationConfiguration,
messagingServiceReturningToStorageOperationsPool,
clusterSvc.topologyService(),
clusterSvc.serializationRegistry(),
replicaMgr,
lockManager,
replicaService,
txManager,
dataStorageManager,
sharedTxStateStorage,
metaStorageMgr,
schemaManager,
threadPoolsManager.tableIoExecutor(),
threadPoolsManager.partitionOperationsExecutor(),
rebalanceScheduler,
threadPoolsManager.commonScheduler(),
clockService,
outgoingSnapshotManager,
distributionZoneManager,
schemaSyncService,
catalogManager,
failureProcessor,
HybridTimestampTracker.atomicTracker(null),
placementDriverManager.placementDriver(),
sqlRef::get,
resourcesRegistry,
lowWatermark,
transactionInflights,
indexMetaStorage,
partitionsLogStorageFactory,
partitionReplicaLifecycleListener,
minTimeCollectorService,
systemDistributedConfiguration
);
var indexManager = new IndexManager(
schemaManager,
tableManager,
catalogManager,
threadPoolsManager.tableIoExecutor(),
registry,
lowWatermark
);
EventLog noopEventLog = new EventLog() {
@Override
public void log(Event event) {
// No-op.
}
@Override
public void log(String type, Supplier<Event> eventProvider) {
// No-op.
}
};
SqlQueryProcessor qryEngine = new SqlQueryProcessor(
clusterSvc,
logicalTopologyService,
tableManager,
schemaManager,
dataStorageManager,
replicaService,
clockService,
schemaSyncService,
catalogManager,
metricManager,
new SystemViewManagerImpl(name, catalogManager, failureProcessor),
failureProcessor,
placementDriverManager.placementDriver(),
clusterConfigRegistry.getConfiguration(SqlClusterExtensionConfiguration.KEY).sql(),
nodeCfgMgr.configurationRegistry().getConfiguration(SqlNodeExtensionConfiguration.KEY).sql(),
transactionInflights,
txManager,
lowWatermark,
threadPoolsManager.commonScheduler(),
new KillCommandHandler(name, logicalTopologyService, clusterSvc.messagingService()),
noopEventLog
);
sqlRef.set(new IgniteSqlImpl(qryEngine, HybridTimestampTracker.atomicTracker(null), threadPoolsManager.commonScheduler()));
// Preparing the result map.
components.add(vault);
components.add(nodeCfgMgr);
// Start.
assertThat(vault.startAsync(new ComponentContext()), willCompleteSuccessfully());
vault.putName(name);
assertThat(nodeCfgMgr.startAsync(new ComponentContext()), willCompleteSuccessfully());
// Start the remaining components.
List<IgniteComponent> otherComponents = List.of(
threadPoolsManager,
failureProcessor,
workerRegistry,
clusterStateStorage,
clusterIdService,
nettyBootstrapFactory,
nettyWorkersRegistrar,
clusterSvc,
partitionsLogStorageFactory,
cmgLogStorageFactory,
msLogStorageFactory,
raftMgr,
cmgManager,
replicaMgr,
txManager,
resourceVacuumManager,
lowWatermark,
metaStorageMgr,
clusterCfgMgr,
dataStorageManager,
clockWaiter,
catalogManager,
indexMetaStorage,
schemaManager,
distributionZoneManager,
sharedTxStateStorage,
tableManager,
indexManager,
qryEngine,
sqlRef.get()
);
for (IgniteComponent component : otherComponents) {
// TODO: IGNITE-22119 required to be able to wait on this future.
component.startAsync(new ComponentContext());
components.add(component);
}
lowWatermark.scheduleUpdates();
PartialNode partialNode = partialNode(
name,
nodeCfgMgr,
clusterCfgMgr,
metaStorageMgr,
components,
localConfigurationGenerator,
logicalTopology,
cfgStorage,
distributedConfigurationGenerator,
clusterConfigRegistry,
hybridClock
);
partialNodes.add(partialNode);
return partialNode;
}