in x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java [915:1358]
public Collection<?> createComponents(PluginServices services) {
Client client = services.client();
ClusterService clusterService = services.clusterService();
ThreadPool threadPool = services.threadPool();
Environment environment = services.environment();
NamedXContentRegistry xContentRegistry = services.xContentRegistry();
IndexNameExpressionResolver indexNameExpressionResolver = services.indexNameExpressionResolver();
TelemetryProvider telemetryProvider = services.telemetryProvider();
if (enabled == false) {
// Holders for @link(MachineLearningFeatureSetUsage) which needs access to job manager and ML extension,
// both empty if ML is disabled
return List.of(new JobManagerHolder(), new MachineLearningExtensionHolder());
}
machineLearningExtension.get().configure(environment.settings());
this.mlUpgradeModeActionFilter.set(new MlUpgradeModeActionFilter(clusterService));
MlIndexTemplateRegistry registry = new MlIndexTemplateRegistry(
settings,
clusterService,
threadPool,
client,
machineLearningExtension.get().useIlm(),
xContentRegistry,
services.projectResolver()
);
registry.initialize();
AnomalyDetectionAuditor anomalyDetectionAuditor = new AnomalyDetectionAuditor(
client,
clusterService,
indexNameExpressionResolver,
machineLearningExtension.get().includeNodeInfo()
);
this.anomalyDetectionAuditor.set(anomalyDetectionAuditor);
DataFrameAnalyticsAuditor dataFrameAnalyticsAuditor = new DataFrameAnalyticsAuditor(
client,
clusterService,
indexNameExpressionResolver,
machineLearningExtension.get().includeNodeInfo()
);
InferenceAuditor inferenceAuditor = new InferenceAuditor(
client,
clusterService,
indexNameExpressionResolver,
machineLearningExtension.get().includeNodeInfo()
);
this.inferenceAuditor.set(inferenceAuditor);
SystemAuditor systemAuditor = new SystemAuditor(client, clusterService, indexNameExpressionResolver);
this.dataFrameAnalyticsAuditor.set(dataFrameAnalyticsAuditor);
OriginSettingClient originSettingClient = new OriginSettingClient(client, ML_ORIGIN);
ResultsPersisterService resultsPersisterService = new ResultsPersisterService(
threadPool,
originSettingClient,
clusterService,
settings
);
AnnotationPersister anomalyDetectionAnnotationPersister = new AnnotationPersister(resultsPersisterService);
JobResultsProvider jobResultsProvider = new JobResultsProvider(client, settings, indexNameExpressionResolver);
JobResultsPersister jobResultsPersister = new JobResultsPersister(originSettingClient, resultsPersisterService);
JobDataCountsPersister jobDataCountsPersister = new JobDataCountsPersister(
client,
resultsPersisterService,
anomalyDetectionAuditor
);
JobConfigProvider jobConfigProvider = new JobConfigProvider(client, xContentRegistry);
DatafeedConfigProvider datafeedConfigProvider = new DatafeedConfigProvider(client, xContentRegistry, clusterService);
this.datafeedConfigProvider.set(datafeedConfigProvider);
UpdateJobProcessNotifier notifier = new UpdateJobProcessNotifier(client, clusterService, threadPool);
JobManager jobManager = new JobManager(
jobResultsProvider,
jobResultsPersister,
clusterService,
anomalyDetectionAuditor,
threadPool,
client,
notifier,
xContentRegistry,
indexNameExpressionResolver,
() -> NativeMemoryCalculator.getMaxModelMemoryLimit(clusterService)
);
DatafeedManager datafeedManager = new DatafeedManager(
datafeedConfigProvider,
jobConfigProvider,
xContentRegistry,
settings,
client
);
// special holder for @link(MachineLearningFeatureSetUsage) which needs access to job manager if ML is enabled
JobManagerHolder jobManagerHolder = new JobManagerHolder(jobManager);
NativeStorageProvider nativeStorageProvider = new NativeStorageProvider(environment, MIN_DISK_SPACE_OFF_HEAP.get(settings));
final MlController mlController;
final AutodetectProcessFactory autodetectProcessFactory;
final NormalizerProcessFactory normalizerProcessFactory;
final AnalyticsProcessFactory<AnalyticsResult> analyticsProcessFactory;
final AnalyticsProcessFactory<MemoryUsageEstimationResult> memoryEstimationProcessFactory;
final PyTorchProcessFactory pyTorchProcessFactory;
if (MachineLearningField.AUTODETECT_PROCESS.get(settings)) {
try {
NativeController nativeController = NativeController.makeNativeController(
clusterService.getNodeName(),
environment,
xContentRegistry
);
autodetectProcessFactory = new NativeAutodetectProcessFactory(
environment,
settings,
nativeController,
clusterService,
resultsPersisterService,
anomalyDetectionAuditor
);
normalizerProcessFactory = new NativeNormalizerProcessFactory(environment, nativeController, clusterService);
analyticsProcessFactory = new NativeAnalyticsProcessFactory(
environment,
nativeController,
clusterService,
xContentRegistry,
resultsPersisterService,
dataFrameAnalyticsAuditor
);
memoryEstimationProcessFactory = new NativeMemoryUsageEstimationProcessFactory(
environment,
nativeController,
clusterService
);
pyTorchProcessFactory = new NativePyTorchProcessFactory(environment, nativeController, clusterService);
mlController = nativeController;
} catch (IOException e) {
// The low level cause of failure from the named pipe helper's perspective is almost never the real root cause, so
// only log this at the lowest level of detail. It's almost always "file not found" on a named pipe we expect to be
// able to connect to, but the thing we really need to know is what stopped the native process creating the named pipe.
logger.trace("Failed to connect to ML native controller", e);
throw new ElasticsearchException(
"Failure running machine learning native code. This could be due to running "
+ "on an unsupported OS or distribution, missing OS libraries, or a problem with the temp directory. To "
+ "bypass this problem by running Elasticsearch without machine learning functionality set ["
+ XPackSettings.MACHINE_LEARNING_ENABLED.getKey()
+ ": false]."
);
}
} else {
mlController = new DummyController();
autodetectProcessFactory = (
pipelineId,
job,
autodetectParams,
executorService,
onProcessCrash) -> new BlackHoleAutodetectProcess(pipelineId, onProcessCrash);
// factor of 1.0 makes renormalization a no-op
normalizerProcessFactory = (jobId, quantilesState, bucketSpan, executorService) -> new MultiplyingNormalizerProcess(1.0);
analyticsProcessFactory = (jobId, analyticsProcessConfig, hasState, executorService, onProcessCrash) -> null;
memoryEstimationProcessFactory = (jobId, analyticsProcessConfig, hasState, executorService, onProcessCrash) -> null;
pyTorchProcessFactory = (task, executorService, afterInputStreamClose, onProcessCrash) -> new BlackHolePyTorchProcess();
}
NormalizerFactory normalizerFactory = new NormalizerFactory(
normalizerProcessFactory,
threadPool.executor(MachineLearning.UTILITY_THREAD_POOL_NAME)
);
AutodetectProcessManager autodetectProcessManager = new AutodetectProcessManager(
settings,
client,
threadPool,
xContentRegistry,
anomalyDetectionAuditor,
clusterService,
jobManager,
jobResultsProvider,
jobResultsPersister,
jobDataCountsPersister,
anomalyDetectionAnnotationPersister,
autodetectProcessFactory,
normalizerFactory,
nativeStorageProvider,
indexNameExpressionResolver
);
this.autodetectProcessManager.set(autodetectProcessManager);
DatafeedJobBuilder datafeedJobBuilder = new DatafeedJobBuilder(
client,
xContentRegistry,
anomalyDetectionAuditor,
anomalyDetectionAnnotationPersister,
System::currentTimeMillis,
jobResultsPersister,
settings,
clusterService
);
DatafeedContextProvider datafeedContextProvider = new DatafeedContextProvider(
jobConfigProvider,
datafeedConfigProvider,
jobResultsProvider
);
DatafeedRunner datafeedRunner = new DatafeedRunner(
threadPool,
client,
clusterService,
datafeedJobBuilder,
System::currentTimeMillis,
anomalyDetectionAuditor,
autodetectProcessManager,
datafeedContextProvider
);
this.datafeedRunner.set(datafeedRunner);
// Inference components
final TrainedModelStatsService trainedModelStatsService = new TrainedModelStatsService(
resultsPersisterService,
originSettingClient,
indexNameExpressionResolver,
clusterService,
threadPool
);
final TrainedModelCacheMetadataService trainedModelCacheMetadataService = new TrainedModelCacheMetadataService(
clusterService,
client
);
final TrainedModelProvider trainedModelProvider = new TrainedModelProvider(
client,
trainedModelCacheMetadataService,
xContentRegistry
);
final ModelLoadingService modelLoadingService = new ModelLoadingService(
trainedModelProvider,
inferenceAuditor,
threadPool,
clusterService,
trainedModelStatsService,
settings,
clusterService.getNodeName(),
inferenceModelBreaker.get(),
getLicenseState()
);
this.modelLoadingService.set(modelLoadingService);
this.learningToRankService.set(
new LearningToRankService(modelLoadingService, trainedModelProvider, services.scriptService(), services.xContentRegistry())
);
this.deploymentManager.set(
new DeploymentManager(
client,
xContentRegistry,
threadPool,
pyTorchProcessFactory,
getMaxModelDeploymentsPerNode(),
inferenceAuditor
)
);
// Data frame analytics components
AnalyticsProcessManager analyticsProcessManager = new AnalyticsProcessManager(
settings,
client,
threadPool,
analyticsProcessFactory,
dataFrameAnalyticsAuditor,
trainedModelProvider,
resultsPersisterService,
EsExecutors.allocatedProcessors(settings)
);
MemoryUsageEstimationProcessManager memoryEstimationProcessManager = new MemoryUsageEstimationProcessManager(
threadPool.generic(),
threadPool.executor(UTILITY_THREAD_POOL_NAME),
memoryEstimationProcessFactory
);
DataFrameAnalyticsConfigProvider dataFrameAnalyticsConfigProvider = new DataFrameAnalyticsConfigProvider(
client,
xContentRegistry,
dataFrameAnalyticsAuditor,
clusterService
);
assert client instanceof NodeClient;
DataFrameAnalyticsManager dataFrameAnalyticsManager = new DataFrameAnalyticsManager(
settings,
(NodeClient) client,
threadPool,
clusterService,
dataFrameAnalyticsConfigProvider,
analyticsProcessManager,
dataFrameAnalyticsAuditor,
indexNameExpressionResolver,
resultsPersisterService,
modelLoadingService,
machineLearningExtension.get().getAnalyticsDestIndexAllowedSettings()
);
this.dataFrameAnalyticsManager.set(dataFrameAnalyticsManager);
// Components shared by anomaly detection and data frame analytics
MlMemoryTracker memoryTracker = new MlMemoryTracker(
settings,
clusterService,
threadPool,
jobManager,
jobResultsProvider,
dataFrameAnalyticsConfigProvider
);
this.memoryTracker.set(memoryTracker);
MlLifeCycleService mlLifeCycleService = new MlLifeCycleService(
clusterService,
datafeedRunner,
mlController,
autodetectProcessManager,
dataFrameAnalyticsManager,
memoryTracker
);
this.mlLifeCycleService.set(mlLifeCycleService);
MlAssignmentNotifier mlAssignmentNotifier = new MlAssignmentNotifier(
anomalyDetectionAuditor,
dataFrameAnalyticsAuditor,
threadPool,
clusterService
);
MlAutoUpdateService mlAutoUpdateService = new MlAutoUpdateService(
threadPool,
List.of(
new DatafeedConfigAutoUpdater(datafeedConfigProvider, indexNameExpressionResolver),
new MlIndexRollover(
List.of(
new MlIndexRollover.IndexPatternAndAlias(
AnomalyDetectorsIndex.jobStateIndexPattern(),
AnomalyDetectorsIndex.jobStateIndexWriteAlias()
),
new MlIndexRollover.IndexPatternAndAlias(MlStatsIndex.indexPattern(), MlStatsIndex.writeAlias()),
new MlIndexRollover.IndexPatternAndAlias(AnnotationIndex.INDEX_PATTERN, AnnotationIndex.WRITE_ALIAS_NAME)
),
indexNameExpressionResolver,
client
),
new MlAnomaliesIndexUpdate(indexNameExpressionResolver, client)
)
);
clusterService.addListener(mlAutoUpdateService);
// this object registers as a license state listener, and is never removed, so there's no need to retain another reference to it
final InvalidLicenseEnforcer enforcer = new InvalidLicenseEnforcer(
getLicenseState(),
threadPool,
datafeedRunner,
autodetectProcessManager
);
enforcer.listenForLicenseStateChanges();
// Perform node startup operations
nativeStorageProvider.cleanupLocalTmpStorageInCaseOfUncleanShutdown();
AbstractNodeAvailabilityZoneMapper nodeAvailabilityZoneMapper = machineLearningExtension.get()
.getNodeAvailabilityZoneMapper(settings, clusterService.getClusterSettings());
clusterService.addListener(nodeAvailabilityZoneMapper);
// allocation service objects
final TrainedModelAssignmentService trainedModelAssignmentService = new TrainedModelAssignmentService(
client,
clusterService,
threadPool
);
trainedModelAllocationClusterServiceSetOnce.set(
new TrainedModelAssignmentClusterService(
settings,
clusterService,
threadPool,
new NodeLoadDetector(memoryTracker),
systemAuditor,
nodeAvailabilityZoneMapper,
client
)
);
mlAutoscalingDeciderService.set(
new MlAutoscalingDeciderService(memoryTracker, settings, nodeAvailabilityZoneMapper, clusterService)
);
AdaptiveAllocationsScalerService adaptiveAllocationsScalerService = new AdaptiveAllocationsScalerService(
threadPool,
clusterService,
client,
inferenceAuditor,
telemetryProvider.getMeterRegistry(),
machineLearningExtension.get().isNlpEnabled()
);
MlInitializationService mlInitializationService = new MlInitializationService(
settings,
threadPool,
clusterService,
client,
adaptiveAllocationsScalerService,
mlAssignmentNotifier,
machineLearningExtension.get().isAnomalyDetectionEnabled(),
machineLearningExtension.get().isDataFrameAnalyticsEnabled(),
machineLearningExtension.get().isNlpEnabled()
);
MlMetrics mlMetrics = new MlMetrics(
telemetryProvider.getMeterRegistry(),
clusterService,
settings,
autodetectProcessManager,
dataFrameAnalyticsManager
);
return List.of(
mlLifeCycleService,
new MlControllerHolder(mlController),
jobResultsProvider,
jobResultsPersister,
jobConfigProvider,
datafeedConfigProvider,
jobManager,
jobManagerHolder,
autodetectProcessManager,
mlInitializationService,
adaptiveAllocationsScalerService,
jobDataCountsPersister,
datafeedRunner,
datafeedManager,
anomalyDetectionAuditor,
dataFrameAnalyticsAuditor,
inferenceAuditor,
systemAuditor,
mlAssignmentNotifier,
mlAutoUpdateService,
memoryTracker,
analyticsProcessManager,
memoryEstimationProcessManager,
dataFrameAnalyticsConfigProvider,
nativeStorageProvider,
modelLoadingService,
trainedModelCacheMetadataService,
trainedModelProvider,
trainedModelAssignmentService,
trainedModelAllocationClusterServiceSetOnce.get(),
deploymentManager.get(),
nodeAvailabilityZoneMapper,
new MachineLearningExtensionHolder(machineLearningExtension.get()),
mlMetrics
);
}