in server/src/main/java/org/elasticsearch/node/NodeConstruction.java [658:1294]
private void construct(
ThreadPool threadPool,
SettingsModule settingsModule,
SearchModule searchModule,
ScriptService scriptService,
AnalysisRegistry analysisRegistry,
NodeServiceProvider serviceProvider,
boolean forbidPrivateIndexSettings,
TelemetryProvider telemetryProvider,
DocumentParsingProvider documentParsingProvider
) throws IOException {
Settings settings = settingsModule.getSettings();
modules.bindToInstance(Tracer.class, telemetryProvider.getTracer());
TaskManager taskManager = new TaskManager(
settings,
threadPool,
Stream.concat(
pluginsService.filterPlugins(ActionPlugin.class).flatMap(p -> p.getTaskHeaders().stream()),
Task.HEADERS_TO_COPY.stream()
).collect(Collectors.toSet()),
telemetryProvider.getTracer()
);
// serverless deployments plug-in the multi-project resolver factory
ProjectResolver projectResolver = pluginsService.loadSingletonServiceProvider(
ProjectResolverFactory.class,
() -> ProjectResolverFactory.DEFAULT
).create();
modules.bindToInstance(ProjectResolver.class, projectResolver);
ClusterService clusterService = createClusterService(settingsModule, threadPool, taskManager);
clusterService.addStateApplier(scriptService);
modules.bindToInstance(DocumentParsingProvider.class, documentParsingProvider);
FeatureService featureService = new FeatureService(pluginsService.loadServiceProviders(FeatureSpecification.class));
FailureStoreMetrics failureStoreMetrics = new FailureStoreMetrics(telemetryProvider.getMeterRegistry());
final IngestService ingestService = new IngestService(
clusterService,
threadPool,
environment,
scriptService,
analysisRegistry,
pluginsService.filterPlugins(IngestPlugin.class).toList(),
client,
IngestService.createGrokThreadWatchdog(environment, threadPool),
failureStoreMetrics,
projectResolver,
featureService
);
SystemIndices systemIndices = createSystemIndices(settings);
CircuitBreakerService circuitBreakerService = createCircuitBreakerService(
new CircuitBreakerMetrics(telemetryProvider),
settingsModule.getSettings(),
settingsModule.getClusterSettings()
);
PageCacheRecycler pageCacheRecycler = serviceProvider.newPageCacheRecycler(pluginsService, settings);
BigArrays bigArrays = serviceProvider.newBigArrays(pluginsService, pageCacheRecycler, circuitBreakerService);
final RecoverySettings recoverySettings = new RecoverySettings(settings, settingsModule.getClusterSettings());
RepositoriesModule repositoriesModule = new RepositoriesModule(
environment,
pluginsService.filterPlugins(RepositoryPlugin.class).toList(),
client,
threadPool,
clusterService,
bigArrays,
xContentRegistry,
recoverySettings,
telemetryProvider
);
RepositoriesService repositoriesService = repositoriesModule.getRepositoryService();
final SetOnce<RerouteService> rerouteServiceReference = new SetOnce<>();
final ClusterInfoService clusterInfoService = serviceProvider.newClusterInfoService(
pluginsService,
settings,
clusterService,
threadPool,
client
);
final InternalSnapshotsInfoService snapshotsInfoService = new InternalSnapshotsInfoService(
settings,
clusterService,
repositoriesService,
rerouteServiceReference::get
);
final ClusterModule clusterModule = new ClusterModule(
settings,
clusterService,
pluginsService.filterPlugins(ClusterPlugin.class).toList(),
clusterInfoService,
snapshotsInfoService,
threadPool,
systemIndices,
projectResolver,
getWriteLoadForecaster(threadPool, settings, clusterService.getClusterSettings()),
telemetryProvider
);
modules.add(clusterModule);
RerouteService rerouteService = new BatchedRerouteService(clusterService, clusterModule.getAllocationService()::reroute);
rerouteServiceReference.set(rerouteService);
clusterInfoService.addListener(
new DiskThresholdMonitor(
settings,
clusterService::state,
clusterService.getClusterSettings(),
client,
threadPool.relativeTimeInMillisSupplier(),
rerouteService,
projectResolver
)::onNewInfo
);
IndicesModule indicesModule = new IndicesModule(pluginsService.filterPlugins(MapperPlugin.class).toList());
modules.add(indicesModule);
modules.add(new GatewayModule());
CompatibilityVersions compatibilityVersions = new CompatibilityVersions(
TransportVersion.current(),
systemIndices.getMappingsVersions()
);
modules.add(
loadPersistedClusterStateService(clusterService.getClusterSettings(), threadPool, compatibilityVersions, projectResolver)
);
final MetaStateService metaStateService = new MetaStateService(nodeEnvironment, xContentRegistry);
if (DiscoveryNode.isMasterNode(settings)) {
clusterService.addListener(new SystemIndexMappingUpdateService(systemIndices, client, projectResolver));
}
SourceFieldMetrics sourceFieldMetrics = new SourceFieldMetrics(
telemetryProvider.getMeterRegistry(),
threadPool::relativeTimeInMillis
);
MapperMetrics mapperMetrics = new MapperMetrics(sourceFieldMetrics);
final List<SearchOperationListener> searchOperationListeners = List.of(
new ShardSearchPhaseAPMMetrics(telemetryProvider.getMeterRegistry())
);
List<? extends SlowLogFieldProvider> slowLogFieldProviders = pluginsService.loadServiceProviders(SlowLogFieldProvider.class);
// NOTE: the response of index/search slow log fields below must be calculated dynamically on every call
// because the responses may change dynamically at runtime
SlowLogFieldProvider slowLogFieldProvider = new SlowLogFieldProvider() {
public SlowLogFields create() {
final List<SlowLogFields> fields = new ArrayList<>();
for (var provider : slowLogFieldProviders) {
fields.add(provider.create());
}
return new SlowLogFields() {
@Override
public Map<String, String> indexFields() {
return fields.stream()
.flatMap(f -> f.indexFields().entrySet().stream())
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
}
@Override
public Map<String, String> searchFields() {
return fields.stream()
.flatMap(f -> f.searchFields().entrySet().stream())
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
}
@Override
public Map<String, String> queryFields() {
return fields.stream()
.flatMap(f -> f.queryFields().entrySet().stream())
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
}
};
}
public SlowLogFields create(IndexSettings indexSettings) {
final List<SlowLogFields> fields = new ArrayList<>();
for (var provider : slowLogFieldProviders) {
fields.add(provider.create(indexSettings));
}
return new SlowLogFields() {
@Override
public Map<String, String> indexFields() {
return fields.stream()
.flatMap(f -> f.indexFields().entrySet().stream())
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
}
@Override
public Map<String, String> searchFields() {
return fields.stream()
.flatMap(f -> f.searchFields().entrySet().stream())
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
}
@Override
public Map<String, String> queryFields() {
return fields.stream()
.flatMap(f -> f.queryFields().entrySet().stream())
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
}
};
}
};
IndicesService indicesService = new IndicesServiceBuilder().settings(settings)
.pluginsService(pluginsService)
.nodeEnvironment(nodeEnvironment)
.xContentRegistry(xContentRegistry)
.analysisRegistry(analysisRegistry)
.indexNameExpressionResolver(clusterModule.getIndexNameExpressionResolver())
.mapperRegistry(indicesModule.getMapperRegistry())
.namedWriteableRegistry(namedWriteableRegistry)
.threadPool(threadPool)
.indexScopedSettings(settingsModule.getIndexScopedSettings())
.circuitBreakerService(circuitBreakerService)
.bigArrays(bigArrays)
.scriptService(scriptService)
.clusterService(clusterService)
.projectResolver(projectResolver)
.client(client)
.metaStateService(metaStateService)
.valuesSourceRegistry(searchModule.getValuesSourceRegistry())
.requestCacheKeyDifferentiator(searchModule.getRequestCacheKeyDifferentiator())
.mapperMetrics(mapperMetrics)
.searchOperationListeners(searchOperationListeners)
.slowLogFieldProvider(slowLogFieldProvider)
.build();
final var parameters = new IndexSettingProvider.Parameters(clusterService, indicesService::createIndexMapperServiceForValidation);
IndexSettingProviders indexSettingProviders = new IndexSettingProviders(
Sets.union(
builtinIndexSettingProviders(),
pluginsService.flatMap(p -> p.getAdditionalIndexSettingProviders(parameters)).collect(Collectors.toSet())
)
);
final ShardLimitValidator shardLimitValidator = new ShardLimitValidator(settings, clusterService);
final MetadataCreateIndexService metadataCreateIndexService = new MetadataCreateIndexService(
settings,
clusterService,
indicesService,
clusterModule.getAllocationService(),
shardLimitValidator,
environment,
settingsModule.getIndexScopedSettings(),
threadPool,
xContentRegistry,
systemIndices,
forbidPrivateIndexSettings,
indexSettingProviders
);
final MetadataUpdateSettingsService metadataUpdateSettingsService = new MetadataUpdateSettingsService(
clusterService,
clusterModule.getAllocationService(),
settingsModule.getIndexScopedSettings(),
indicesService,
shardLimitValidator,
threadPool
);
final DataStreamGlobalRetentionSettings dataStreamGlobalRetentionSettings = createDataStreamServicesAndGlobalRetentionResolver(
threadPool,
clusterService,
indicesService,
metadataCreateIndexService
);
final IndexingPressure indexingLimits = new IndexingPressure(settings);
PluginServiceInstances pluginServices = new PluginServiceInstances(
client,
clusterService,
rerouteService,
threadPool,
createResourceWatcherService(settings, threadPool),
scriptService,
xContentRegistry,
environment,
nodeEnvironment,
namedWriteableRegistry,
clusterModule.getIndexNameExpressionResolver(),
repositoriesService,
telemetryProvider,
clusterModule.getAllocationService(),
indicesService,
featureService,
systemIndices,
dataStreamGlobalRetentionSettings,
documentParsingProvider,
taskManager,
projectResolver,
slowLogFieldProvider,
indexingLimits
);
Collection<?> pluginComponents = pluginsService.flatMap(plugin -> {
Collection<?> allItems = plugin.createComponents(pluginServices);
List<?> componentObjects = allItems.stream().filter(not(x -> x instanceof Class<?>)).toList();
List<? extends Class<?>> classes = allItems.stream().filter(x -> x instanceof Class<?>).map(x -> (Class<?>) x).toList();
// Then, injection
Collection<?> componentsFromInjector;
if (classes.isEmpty()) {
componentsFromInjector = Set.of();
} else {
logger.debug("Using injector to instantiate classes for {}: {}", plugin.getClass().getSimpleName(), classes);
var injector = org.elasticsearch.injection.Injector.create();
injector.addInstances(componentObjects);
addRecordContents(injector, pluginServices);
var resultMap = injector.inject(classes);
// For now, assume we want all components added to the Guice injector
var distinctObjects = newSetFromMap(new IdentityHashMap<>());
distinctObjects.addAll(resultMap.values());
componentsFromInjector = distinctObjects;
}
// Return both
return Stream.of(componentObjects, componentsFromInjector).flatMap(Collection::stream).toList();
}).toList();
var terminationHandlers = pluginsService.loadServiceProviders(TerminationHandlerProvider.class)
.stream()
.map(TerminationHandlerProvider::handler);
terminationHandler = getSinglePlugin(terminationHandlers, TerminationHandler.class).orElse(null);
final IncrementalBulkService incrementalBulkService = new IncrementalBulkService(client, indexingLimits);
final ResponseCollectorService responseCollectorService = new ResponseCollectorService(clusterService);
modules.bindToInstance(ResponseCollectorService.class, responseCollectorService);
var reservedStateHandlerProviders = pluginsService.loadServiceProviders(ReservedClusterStateHandlerProvider.class);
ActionModule actionModule = new ActionModule(
settings,
clusterModule.getIndexNameExpressionResolver(),
namedWriteableRegistry,
settingsModule.getIndexScopedSettings(),
settingsModule.getClusterSettings(),
settingsModule.getSettingsFilter(),
threadPool,
pluginsService.filterPlugins(ActionPlugin.class).toList(),
client,
circuitBreakerService,
createUsageService(),
systemIndices,
telemetryProvider,
clusterService,
rerouteService,
buildReservedClusterStateHandlers(reservedStateHandlerProviders, settingsModule),
buildReservedProjectStateHandlers(
reservedStateHandlerProviders,
settingsModule,
clusterService,
indicesService,
systemIndices,
indexSettingProviders,
metadataCreateIndexService,
dataStreamGlobalRetentionSettings
),
pluginsService.loadSingletonServiceProvider(RestExtension.class, RestExtension::allowAll),
incrementalBulkService,
projectResolver
);
modules.add(actionModule);
final NetworkService networkService = new NetworkService(
pluginsService.filterPlugins(DiscoveryPlugin.class)
.map(d -> d.getCustomNameResolver(environment.settings()))
.filter(Objects::nonNull)
.toList()
);
final NetworkModule networkModule = new NetworkModule(
settings,
pluginsService.filterPlugins(NetworkPlugin.class).toList(),
threadPool,
bigArrays,
pageCacheRecycler,
circuitBreakerService,
namedWriteableRegistry,
xContentRegistry,
networkService,
actionModule.getRestController(),
actionModule::copyRequestHeadersToThreadContext,
clusterService.getClusterSettings(),
telemetryProvider.getTracer()
);
var indexTemplateMetadataUpgraders = pluginsService.map(Plugin::getIndexTemplateMetadataUpgrader).toList();
List<Map<String, UnaryOperator<Metadata.ProjectCustom>>> customMetadataUpgraders = pluginsService.map(
Plugin::getProjectCustomMetadataUpgraders
).toList();
modules.bindToInstance(MetadataUpgrader.class, new MetadataUpgrader(indexTemplateMetadataUpgraders, customMetadataUpgraders));
final IndexMetadataVerifier indexMetadataVerifier = new IndexMetadataVerifier(
settings,
clusterService,
xContentRegistry,
indicesModule.getMapperRegistry(),
settingsModule.getIndexScopedSettings(),
scriptService,
mapperMetrics
);
if (DiscoveryNode.isMasterNode(settings)) {
clusterService.addListener(new SystemIndexMetadataUpgradeService(systemIndices, clusterService));
clusterService.addListener(new TemplateUpgradeService(client, clusterService, threadPool, indexTemplateMetadataUpgraders));
}
final Transport transport = networkModule.getTransportSupplier().get();
final TransportService transportService = serviceProvider.newTransportService(
pluginsService,
settings,
transport,
threadPool,
networkModule.getTransportInterceptor(),
localNodeFactory,
settingsModule.getClusterSettings(),
taskManager,
telemetryProvider.getTracer()
);
final SearchResponseMetrics searchResponseMetrics = new SearchResponseMetrics(telemetryProvider.getMeterRegistry());
final SearchTransportService searchTransportService = new SearchTransportService(
transportService,
client,
SearchExecutionStatsCollector.makeWrapper(responseCollectorService)
);
final HttpServerTransport httpServerTransport = serviceProvider.newHttpTransport(pluginsService, networkModule);
SnapshotsService snapshotsService = new SnapshotsService(
settings,
clusterService,
rerouteService,
clusterModule.getIndexNameExpressionResolver(),
repositoriesService,
transportService,
actionModule.getActionFilters(),
systemIndices
);
SnapshotShardsService snapshotShardsService = new SnapshotShardsService(
settings,
clusterService,
repositoriesService,
transportService,
indicesService
);
actionModule.getReservedClusterStateService().installClusterStateHandler(new ReservedRepositoryAction(repositoriesService));
actionModule.getReservedClusterStateService().installProjectStateHandler(new ReservedPipelineAction());
var fileSettingsHealthIndicatorPublisher = new FileSettingsService.FileSettingsHealthIndicatorPublisherImpl(clusterService, client);
var fileSettingsHealthTracker = new FileSettingsHealthTracker(settings, fileSettingsHealthIndicatorPublisher);
FileSettingsService fileSettingsService = pluginsService.loadSingletonServiceProvider(
FileSettingsServiceProvider.class,
() -> FileSettingsService::new
).construct(clusterService, actionModule.getReservedClusterStateService(), environment, fileSettingsHealthTracker);
RestoreService restoreService = new RestoreService(
clusterService,
repositoriesService,
clusterModule.getAllocationService(),
metadataCreateIndexService,
indexMetadataVerifier,
shardLimitValidator,
systemIndices,
indicesService,
fileSettingsService,
threadPool
);
DiscoveryModule discoveryModule = createDiscoveryModule(
settings,
threadPool,
transportService,
networkService,
clusterService,
clusterModule.getAllocationService(),
rerouteService,
circuitBreakerService,
compatibilityVersions,
featureService
);
nodeService = new NodeService(
settings,
threadPool,
new MonitorService(settings, nodeEnvironment, threadPool),
discoveryModule.getCoordinator(),
transportService,
indicesService,
pluginsService,
circuitBreakerService,
scriptService,
httpServerTransport,
ingestService,
clusterService,
settingsModule.getSettingsFilter(),
responseCollectorService,
searchTransportService,
indexingLimits,
searchModule.getValuesSourceRegistry().getUsageService(),
repositoriesService,
compatibilityVersions
);
final TimeValue metricsInterval = settings.getAsTime("telemetry.agent.metrics_interval", TimeValue.timeValueSeconds(10));
final NodeMetrics nodeMetrics = new NodeMetrics(telemetryProvider.getMeterRegistry(), nodeService, metricsInterval);
final IndicesMetrics indicesMetrics = new IndicesMetrics(telemetryProvider.getMeterRegistry(), indicesService, metricsInterval);
OnlinePrewarmingService onlinePrewarmingService = pluginsService.loadSingletonServiceProvider(
OnlinePrewarmingServiceProvider.class,
() -> OnlinePrewarmingServiceProvider.DEFAULT
).create(clusterService.getSettings(), threadPool, clusterService);
final SearchService searchService = serviceProvider.newSearchService(
pluginsService,
clusterService,
indicesService,
threadPool,
scriptService,
bigArrays,
searchModule.getFetchPhase(),
circuitBreakerService,
systemIndices.getExecutorSelector(),
telemetryProvider.getTracer(),
onlinePrewarmingService
);
final ShutdownPrepareService shutdownPrepareService = new ShutdownPrepareService(settings, httpServerTransport, terminationHandler);
modules.add(loadPersistentTasksService(settingsModule, clusterService, threadPool, clusterModule.getIndexNameExpressionResolver()));
modules.add(
loadPluginShutdownService(clusterService),
loadDiagnosticServices(
settings,
discoveryModule.getCoordinator(),
clusterService,
transportService,
threadPool,
telemetryProvider,
repositoriesService
)
);
RecoveryPlannerService recoveryPlannerService = getRecoveryPlannerService(threadPool, clusterService, repositoriesService);
modules.add(b -> {
serviceProvider.processRecoverySettings(pluginsService, settingsModule.getClusterSettings(), recoverySettings);
SnapshotFilesProvider snapshotFilesProvider = new SnapshotFilesProvider(repositoriesService);
var peerRecovery = new PeerRecoverySourceService(
transportService,
indicesService,
clusterService,
recoverySettings,
recoveryPlannerService
);
resourcesToClose.add(peerRecovery);
b.bind(PeerRecoverySourceService.class).toInstance(peerRecovery);
b.bind(PeerRecoveryTargetService.class)
.toInstance(
new PeerRecoveryTargetService(
client,
threadPool,
transportService,
recoverySettings,
clusterService,
snapshotFilesProvider
)
);
});
modules.add(loadPluginComponents(pluginComponents));
DataStreamAutoShardingService dataStreamAutoShardingService = new DataStreamAutoShardingService(
settings,
clusterService,
threadPool::absoluteTimeInMillis
);
dataStreamAutoShardingService.init();
modules.add(b -> {
b.bind(NodeService.class).toInstance(nodeService);
b.bind(BigArrays.class).toInstance(bigArrays);
b.bind(PageCacheRecycler.class).toInstance(pageCacheRecycler);
b.bind(IngestService.class).toInstance(ingestService);
b.bind(IndexingPressure.class).toInstance(indexingLimits);
b.bind(IncrementalBulkService.class).toInstance(incrementalBulkService);
b.bind(AggregationUsageService.class).toInstance(searchModule.getValuesSourceRegistry().getUsageService());
b.bind(MetaStateService.class).toInstance(metaStateService);
b.bind(IndicesService.class).toInstance(indicesService);
b.bind(MetadataCreateIndexService.class).toInstance(metadataCreateIndexService);
b.bind(MetadataUpdateSettingsService.class).toInstance(metadataUpdateSettingsService);
b.bind(SearchService.class).toInstance(searchService);
b.bind(SearchResponseMetrics.class).toInstance(searchResponseMetrics);
b.bind(SearchTransportService.class).toInstance(searchTransportService);
b.bind(SearchPhaseController.class).toInstance(new SearchPhaseController(searchService::aggReduceContextBuilder));
b.bind(Transport.class).toInstance(transport);
b.bind(TransportService.class).toInstance(transportService);
b.bind(NodeMetrics.class).toInstance(nodeMetrics);
b.bind(IndicesMetrics.class).toInstance(indicesMetrics);
b.bind(NetworkService.class).toInstance(networkService);
b.bind(IndexMetadataVerifier.class).toInstance(indexMetadataVerifier);
b.bind(ClusterInfoService.class).toInstance(clusterInfoService);
b.bind(SnapshotsInfoService.class).toInstance(snapshotsInfoService);
b.bind(FeatureService.class).toInstance(featureService);
b.bind(HttpServerTransport.class).toInstance(httpServerTransport);
b.bind(RepositoriesService.class).toInstance(repositoriesService);
b.bind(SnapshotsService.class).toInstance(snapshotsService);
b.bind(SnapshotShardsService.class).toInstance(snapshotShardsService);
b.bind(RestoreService.class).toInstance(restoreService);
b.bind(RerouteService.class).toInstance(rerouteService);
b.bind(ShardLimitValidator.class).toInstance(shardLimitValidator);
b.bind(IndexSettingProviders.class).toInstance(indexSettingProviders);
b.bind(FileSettingsService.class).toInstance(fileSettingsService);
b.bind(CompatibilityVersions.class).toInstance(compatibilityVersions);
b.bind(DataStreamAutoShardingService.class).toInstance(dataStreamAutoShardingService);
b.bind(FailureStoreMetrics.class).toInstance(failureStoreMetrics);
b.bind(ShutdownPrepareService.class).toInstance(shutdownPrepareService);
b.bind(OnlinePrewarmingService.class).toInstance(onlinePrewarmingService);
});
if (ReadinessService.enabled(environment)) {
modules.bindToInstance(
ReadinessService.class,
serviceProvider.newReadinessService(pluginsService, clusterService, environment)
);
}
injector = modules.createInjector();
postInjection(clusterModule, actionModule, clusterService, transportService, featureService);
}