in modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java [193:467]
private PartialNode startPartialNode(
int idx,
@Nullable @Language("HOCON") String cfgString,
@Nullable Consumer<Long> revisionCallback
) {
String name = testNodeName(testInfo, idx);
Path dir = workDir.resolve(name);
List<IgniteComponent> components = new ArrayList<>();
VaultManager vault = createVault(name, dir);
ConfigurationModules modules = loadConfigurationModules(log, Thread.currentThread().getContextClassLoader());
Path configFile = workDir.resolve(TestIgnitionManager.DEFAULT_CONFIG_NAME);
String configString = cfgString == null ? configurationString(idx) : cfgString;
try {
Files.writeString(configFile, configString);
} catch (IOException e) {
throw new NodeConfigWriteException("Failed to write config content to file.", e);
}
var localConfigurationGenerator = new ConfigurationTreeGenerator(
modules.local().rootKeys(),
modules.local().internalSchemaExtensions(),
modules.local().polymorphicSchemaExtensions()
);
var nodeCfgMgr = new ConfigurationManager(
modules.local().rootKeys(),
new LocalFileConfigurationStorage(configFile, localConfigurationGenerator),
localConfigurationGenerator,
ConfigurationValidatorImpl.withDefaultValidators(localConfigurationGenerator, modules.local().validators())
);
NetworkConfiguration networkConfiguration = nodeCfgMgr.configurationRegistry().getConfiguration(NetworkConfiguration.KEY);
var nettyBootstrapFactory = new NettyBootstrapFactory(networkConfiguration, name);
var clusterSvc = new TestScaleCubeClusterServiceFactory().createClusterService(
name,
networkConfiguration,
nettyBootstrapFactory,
defaultSerializationRegistry(),
new VaultStateIds(vault)
);
HybridClock hybridClock = new HybridClockImpl();
var raftGroupEventsClientListener = new RaftGroupEventsClientListener();
var raftMgr = new Loza(clusterSvc, raftConfiguration, dir, hybridClock, raftGroupEventsClientListener);
var clusterStateStorage = new RocksDbClusterStateStorage(dir.resolve("cmg"));
var logicalTopology = new LogicalTopologyImpl(clusterStateStorage);
var cmgManager = new ClusterManagementGroupManager(
vault,
clusterSvc,
raftMgr,
clusterStateStorage,
logicalTopology,
clusterManagementConfiguration,
nodeAttributes,
new TestConfigurationValidator());
ReplicaManager replicaMgr = new ReplicaManager(
name,
clusterSvc,
cmgManager,
hybridClock,
Set.of(TableMessageGroup.class, TxMessageGroup.class)
);
var replicaService = new ReplicaService(clusterSvc.messagingService(), hybridClock);
var lockManager = new HeapLockManager();
ReplicaService replicaSvc = new ReplicaService(clusterSvc.messagingService(), hybridClock);
var txManager = new TxManagerImpl(replicaService, lockManager, hybridClock, new TransactionIdGenerator(idx));
var logicalTopologyService = new LogicalTopologyServiceImpl(logicalTopology, cmgManager);
var topologyAwareRaftGroupServiceFactory = new TopologyAwareRaftGroupServiceFactory(
clusterSvc,
logicalTopologyService,
Loza.FACTORY,
raftGroupEventsClientListener
);
var metaStorageMgr = new MetaStorageManagerImpl(
vault,
clusterSvc,
cmgManager,
logicalTopologyService,
raftMgr,
new RocksDbKeyValueStorage(name, dir.resolve("metastorage")),
hybridClock,
topologyAwareRaftGroupServiceFactory,
metaStorageConfiguration
);
var cfgStorage = new DistributedConfigurationStorage(metaStorageMgr);
ConfigurationTreeGenerator distributedConfigurationGenerator = new ConfigurationTreeGenerator(
modules.distributed().rootKeys(),
modules.distributed().internalSchemaExtensions(),
modules.distributed().polymorphicSchemaExtensions()
);
var clusterCfgMgr = new ConfigurationManager(
modules.distributed().rootKeys(),
cfgStorage,
distributedConfigurationGenerator,
ConfigurationValidatorImpl.withDefaultValidators(distributedConfigurationGenerator, modules.distributed().validators())
);
ConfigurationRegistry clusterConfigRegistry = clusterCfgMgr.configurationRegistry();
Consumer<LongFunction<CompletableFuture<?>>> registry = (c) -> metaStorageMgr.registerRevisionUpdateListener(c::apply);
var baselineManager = new BaselineManager(
clusterCfgMgr,
metaStorageMgr,
clusterSvc
);
DataStorageModules dataStorageModules = new DataStorageModules(ServiceLoader.load(DataStorageModule.class));
Path storagePath = getPartitionsStorePath(dir);
DistributionZonesConfiguration zonesConfig = clusterConfigRegistry.getConfiguration(DistributionZonesConfiguration.KEY);
DataStorageManager dataStorageManager = new DataStorageManager(
zonesConfig,
dataStorageModules.createStorageEngines(
name,
clusterConfigRegistry,
storagePath,
null
)
);
TablesConfiguration tablesConfig = clusterConfigRegistry.getConfiguration(TablesConfiguration.KEY);
GcConfiguration gcConfig = clusterConfigRegistry.getConfiguration(GcConfiguration.KEY);
SchemaManager schemaManager = new SchemaManager(registry, tablesConfig, metaStorageMgr);
Consumer<LongFunction<CompletableFuture<?>>> revisionUpdater = (LongFunction<CompletableFuture<?>> function) ->
metaStorageMgr.registerRevisionUpdateListener(function::apply);
DistributionZoneManager distributionZoneManager = new DistributionZoneManager(
null,
zonesConfig,
tablesConfig,
metaStorageMgr,
logicalTopologyService,
vault,
name
);
var clockWaiter = new ClockWaiter("test", hybridClock);
var catalogManager = new CatalogManagerImpl(
new UpdateLogImpl(metaStorageMgr),
clockWaiter
);
TableManager tableManager = new TableManager(
name,
registry,
tablesConfig,
zonesConfig,
gcConfig,
clusterSvc,
raftMgr,
replicaMgr,
lockManager,
replicaService,
baselineManager,
clusterSvc.topologyService(),
txManager,
dataStorageManager,
storagePath,
metaStorageMgr,
schemaManager,
view -> new LocalLogStorageFactory(),
hybridClock,
new OutgoingSnapshotsManager(clusterSvc.messagingService()),
topologyAwareRaftGroupServiceFactory,
vault,
null,
null
);
var indexManager = new IndexManager(tablesConfig, schemaManager, tableManager);
var metricManager = new MetricManager();
SqlQueryProcessor qryEngine = new SqlQueryProcessor(
registry,
clusterSvc,
tableManager,
indexManager,
schemaManager,
dataStorageManager,
txManager,
distributionZoneManager,
() -> dataStorageModules.collectSchemasFields(modules.distributed().polymorphicSchemaExtensions()),
replicaSvc,
hybridClock,
catalogManager,
metricManager
);
// Preparing the result map.
components.add(vault);
components.add(nodeCfgMgr);
// Start.
vault.start();
vault.putName(name).join();
nodeCfgMgr.start();
// Start the remaining components.
List<IgniteComponent> otherComponents = List.of(
nettyBootstrapFactory,
clusterSvc,
raftMgr,
clusterStateStorage,
cmgManager,
replicaMgr,
txManager,
baselineManager,
metaStorageMgr,
clusterCfgMgr,
dataStorageManager,
clockWaiter,
catalogManager,
schemaManager,
distributionZoneManager,
tableManager,
indexManager,
qryEngine
);
for (IgniteComponent component : otherComponents) {
component.start();
components.add(component);
}
PartialNode partialNode = partialNode(
nodeCfgMgr,
clusterCfgMgr,
metaStorageMgr,
revisionCallback,
components,
localConfigurationGenerator,
logicalTopology,
cfgStorage,
distributedConfigurationGenerator,
clusterConfigRegistry
);
partialNodes.add(partialNode);
return partialNode;
}