in modules/distribution-zones/src/integrationTest/java/org/apache/ignite/internal/distributionzones/ItIgniteDistributionZoneManagerNodeRestartTest.java [191:371]
private PartialNode startPartialNode(int idx, Consumer<MetaStorageManager> metaStorageMocker) {
String name = testNodeName(testInfo, idx);
Path dir = workDir.resolve(name);
List<IgniteComponent> components = new ArrayList<>();
VaultManager vault = createVault(dir);
var clusterStateStorage = TestClusterStateStorage.initializedClusterStateStorage();
var clusterIdService = new ClusterIdService(vault);
ConfigurationModules modules = loadConfigurationModules(log, Thread.currentThread().getContextClassLoader());
Path configFile = workDir.resolve(TestIgnitionManager.DEFAULT_CONFIG_NAME);
String configString = configurationString(idx);
try {
Files.writeString(configFile, configString);
} catch (IOException e) {
throw new NodeConfigWriteException("Failed to write config content to file.", e);
}
var localConfigurationGenerator = new ConfigurationTreeGenerator(
modules.local().rootKeys(),
modules.local().schemaExtensions(),
modules.local().polymorphicSchemaExtensions()
);
var nodeCfgMgr = new ConfigurationManager(
modules.local().rootKeys(),
new LocalFileConfigurationStorage(configFile, localConfigurationGenerator, modules.local()),
localConfigurationGenerator,
ConfigurationValidatorImpl.withDefaultValidators(localConfigurationGenerator, modules.local().validators())
);
NetworkConfiguration networkConfiguration = nodeCfgMgr.configurationRegistry()
.getConfiguration(NetworkExtensionConfiguration.KEY).network();
var nettyBootstrapFactory = new NettyBootstrapFactory(networkConfiguration, name);
var failureProcessor = mock(FailureProcessor.class);
var clusterSvc = new TestScaleCubeClusterServiceFactory().createClusterService(
name,
networkConfiguration,
nettyBootstrapFactory,
defaultSerializationRegistry(),
new VaultStaleIds(vault),
clusterIdService,
new NoOpCriticalWorkerRegistry(),
failureProcessor,
defaultChannelTypeRegistry(),
new DefaultIgniteProductVersionSource()
);
var logicalTopology = new LogicalTopologyImpl(clusterStateStorage, failureProcessor);
var cmgManager = mock(ClusterManagementGroupManager.class);
when(cmgManager.logicalTopology()).thenAnswer(invocation -> completedFuture(logicalTopology.getLogicalTopology()));
when(cmgManager.startAsync(any())).thenReturn(nullCompletedFuture());
when(cmgManager.stopAsync(any())).thenReturn(nullCompletedFuture());
var readOperationForCompactionTracker = new ReadOperationForCompactionTracker();
var storage = new RocksDbKeyValueStorage(
name,
workDir.resolve("metastorage"),
failureProcessor,
readOperationForCompactionTracker,
commonScheduledExecutorService
);
var clock = new HybridClockImpl();
metastore = spy(StandaloneMetaStorageManager.create(storage, clock, readOperationForCompactionTracker));
metaStorageMocker.accept(metastore);
blockMetaStorageUpdates(metastore);
var revisionUpdater = new MetaStorageRevisionListenerRegistry(metastore);
var cfgStorage = new DistributedConfigurationStorage("test", metastore);
ConfigurationTreeGenerator distributedConfigurationGenerator = new ConfigurationTreeGenerator(
modules.distributed().rootKeys(),
modules.distributed().schemaExtensions(),
modules.distributed().polymorphicSchemaExtensions()
);
Set<Validator<?, ?>> validators = new HashSet<>(modules.distributed().validators());
validators.remove(AuthenticationProvidersValidatorImpl.INSTANCE);
var clusterCfgMgr = new ConfigurationManager(
modules.distributed().rootKeys(),
cfgStorage,
distributedConfigurationGenerator,
ConfigurationValidatorImpl.withDefaultValidators(distributedConfigurationGenerator, validators)
);
ConfigurationRegistry clusterConfigRegistry = clusterCfgMgr.configurationRegistry();
LogicalTopologyServiceImpl logicalTopologyService = new LogicalTopologyServiceImpl(logicalTopology, cmgManager);
var clockWaiter = new ClockWaiter(name, clock, commonScheduledExecutorService);
ClockService clockService = new TestClockService(clock, clockWaiter);
var catalogManager = new CatalogManagerImpl(
new UpdateLogImpl(metastore, failureProcessor),
clockService,
failureProcessor,
() -> TEST_DELAY_DURATION
);
DistributionZoneManager distributionZoneManager = new DistributionZoneManager(
name,
revisionUpdater,
metastore,
logicalTopologyService,
catalogManager,
clusterCfgMgr.configurationRegistry().getConfiguration(SystemDistributedExtensionConfiguration.KEY).system(),
clockService
);
// Preparing the result map.
components.add(vault);
components.add(nodeCfgMgr);
// Start.
ComponentContext componentContext = new ComponentContext();
assertThat(vault.startAsync(componentContext), willCompleteSuccessfully());
vault.putName(name);
assertThat(nodeCfgMgr.startAsync(componentContext), willCompleteSuccessfully());
// Start the remaining components.
List<IgniteComponent> otherComponents = List.of(
clusterStateStorage,
clusterIdService,
nettyBootstrapFactory,
clusterSvc,
cmgManager,
metastore,
clusterCfgMgr,
clockWaiter,
catalogManager,
distributionZoneManager
);
for (IgniteComponent component : otherComponents) {
// TODO: IGNITE-22119 required to be able to wait on this future.
component.startAsync(componentContext);
components.add(component);
}
PartialNode partialNode = partialNode(
name,
nodeCfgMgr,
clusterCfgMgr,
metastore,
components,
localConfigurationGenerator,
logicalTopology,
cfgStorage,
distributedConfigurationGenerator,
clusterConfigRegistry,
clock
);
assertThat(catalogManager.catalogInitializationFuture(), willCompleteSuccessfully());
partialNodes.add(partialNode);
return partialNode;
}