in server/src/main/java/org/opensearch/node/Node.java [338:979]
protected Node(
final Environment initialEnvironment,
Collection<Class<? extends Plugin>> classpathPlugins,
boolean forbidPrivateIndexSettings
) {
final List<Closeable> resourcesToClose = new ArrayList<>(); // register everything we need to release in the case of an error
boolean success = false;
try {
Settings tmpSettings = Settings.builder()
.put(initialEnvironment.settings())
.put(Client.CLIENT_TYPE_SETTING_S.getKey(), CLIENT_TYPE)
.build();
// Enabling shard indexing backpressure node-attribute
tmpSettings = Settings.builder()
.put(tmpSettings)
.put(NODE_ATTRIBUTES.getKey() + SHARD_INDEXING_PRESSURE_ENABLED_ATTRIBUTE_KEY, "true")
.build();
final JvmInfo jvmInfo = JvmInfo.jvmInfo();
logger.info(
"version[{}], pid[{}], build[{}/{}/{}], OS[{}/{}/{}], JVM[{}/{}/{}/{}]",
Build.CURRENT.getQualifiedVersion(),
jvmInfo.pid(),
Build.CURRENT.type().displayName(),
Build.CURRENT.hash(),
Build.CURRENT.date(),
Constants.OS_NAME,
Constants.OS_VERSION,
Constants.OS_ARCH,
Constants.JVM_VENDOR,
Constants.JVM_NAME,
Constants.JAVA_VERSION,
Constants.JVM_VERSION
);
if (jvmInfo.getBundledJdk()) {
logger.info("JVM home [{}], using bundled JDK [{}]", System.getProperty("java.home"), jvmInfo.getUsingBundledJdk());
} else {
logger.info("JVM home [{}]", System.getProperty("java.home"));
deprecationLogger.deprecate(
"no-jdk",
"no-jdk distributions that do not bundle a JDK are deprecated and will be removed in a future release"
);
}
logger.info("JVM arguments {}", Arrays.toString(jvmInfo.getInputArguments()));
if (Build.CURRENT.isProductionRelease() == false) {
logger.warn(
"version [{}] is a pre-release version of OpenSearch and is not suitable for production",
Build.CURRENT.getQualifiedVersion()
);
}
if (logger.isDebugEnabled()) {
logger.debug(
"using config [{}], data [{}], logs [{}], plugins [{}]",
initialEnvironment.configFile(),
Arrays.toString(initialEnvironment.dataFiles()),
initialEnvironment.logsFile(),
initialEnvironment.pluginsFile()
);
}
this.pluginsService = new PluginsService(
tmpSettings,
initialEnvironment.configFile(),
initialEnvironment.modulesFile(),
initialEnvironment.pluginsFile(),
classpathPlugins
);
final Settings settings = pluginsService.updatedSettings();
final Set<DiscoveryNodeRole> additionalRoles = pluginsService.filterPlugins(Plugin.class)
.stream()
.map(Plugin::getRoles)
.flatMap(Set::stream)
.collect(Collectors.toSet());
DiscoveryNode.setAdditionalRoles(additionalRoles);
/*
* Create the environment based on the finalized view of the settings. This is to ensure that components get the same setting
* values, no matter they ask for them from.
*/
this.environment = new Environment(settings, initialEnvironment.configFile(), Node.NODE_LOCAL_STORAGE_SETTING.get(settings));
Environment.assertEquivalent(initialEnvironment, this.environment);
nodeEnvironment = new NodeEnvironment(tmpSettings, environment);
logger.info(
"node name [{}], node ID [{}], cluster name [{}], roles {}",
NODE_NAME_SETTING.get(tmpSettings),
nodeEnvironment.nodeId(),
ClusterName.CLUSTER_NAME_SETTING.get(tmpSettings).value(),
DiscoveryNode.getRolesFromSettings(settings)
.stream()
.map(DiscoveryNodeRole::roleName)
.collect(Collectors.toCollection(LinkedHashSet::new))
);
resourcesToClose.add(nodeEnvironment);
localNodeFactory = new LocalNodeFactory(settings, nodeEnvironment.nodeId());
final List<ExecutorBuilder<?>> executorBuilders = pluginsService.getExecutorBuilders(settings);
final ThreadPool threadPool = new ThreadPool(settings, executorBuilders.toArray(new ExecutorBuilder[0]));
resourcesToClose.add(() -> ThreadPool.terminate(threadPool, 10, TimeUnit.SECONDS));
final ResourceWatcherService resourceWatcherService = new ResourceWatcherService(settings, threadPool);
resourcesToClose.add(resourceWatcherService);
// adds the context to the DeprecationLogger so that it does not need to be injected everywhere
HeaderWarning.setThreadContext(threadPool.getThreadContext());
resourcesToClose.add(() -> HeaderWarning.removeThreadContext(threadPool.getThreadContext()));
final List<Setting<?>> additionalSettings = new ArrayList<>();
// register the node.data, node.ingest, node.master, node.remote_cluster_client settings here so we can mark them private
additionalSettings.add(NODE_DATA_SETTING);
additionalSettings.add(NODE_INGEST_SETTING);
additionalSettings.add(NODE_MASTER_SETTING);
additionalSettings.add(NODE_REMOTE_CLUSTER_CLIENT);
additionalSettings.addAll(pluginsService.getPluginSettings());
final List<String> additionalSettingsFilter = new ArrayList<>(pluginsService.getPluginSettingsFilter());
for (final ExecutorBuilder<?> builder : threadPool.builders()) {
additionalSettings.addAll(builder.getRegisteredSettings());
}
client = new NodeClient(settings, threadPool);
final ScriptModule scriptModule = new ScriptModule(settings, pluginsService.filterPlugins(ScriptPlugin.class));
final ScriptService scriptService = newScriptService(settings, scriptModule.engines, scriptModule.contexts);
AnalysisModule analysisModule = new AnalysisModule(this.environment, pluginsService.filterPlugins(AnalysisPlugin.class));
// this is as early as we can validate settings at this point. we already pass them to ScriptModule as well as ThreadPool
// so we might be late here already
final Set<SettingUpgrader<?>> settingsUpgraders = pluginsService.filterPlugins(Plugin.class)
.stream()
.map(Plugin::getSettingUpgraders)
.flatMap(List::stream)
.collect(Collectors.toSet());
final SettingsModule settingsModule = new SettingsModule(
settings,
additionalSettings,
additionalSettingsFilter,
settingsUpgraders
);
scriptModule.registerClusterSettingsListeners(scriptService, settingsModule.getClusterSettings());
final NetworkService networkService = new NetworkService(
getCustomNameResolvers(pluginsService.filterPlugins(DiscoveryPlugin.class))
);
List<ClusterPlugin> clusterPlugins = pluginsService.filterPlugins(ClusterPlugin.class);
final ClusterService clusterService = new ClusterService(settings, settingsModule.getClusterSettings(), threadPool);
clusterService.addStateApplier(scriptService);
resourcesToClose.add(clusterService);
final Set<Setting<?>> consistentSettings = settingsModule.getConsistentSettings();
if (consistentSettings.isEmpty() == false) {
clusterService.addLocalNodeMasterListener(
new ConsistentSettingsService(settings, clusterService, consistentSettings).newHashPublisher()
);
}
final IngestService ingestService = new IngestService(
clusterService,
threadPool,
this.environment,
scriptService,
analysisModule.getAnalysisRegistry(),
pluginsService.filterPlugins(IngestPlugin.class),
client
);
final SetOnce<RepositoriesService> repositoriesServiceReference = new SetOnce<>();
final ClusterInfoService clusterInfoService = newClusterInfoService(settings, clusterService, threadPool, client);
final UsageService usageService = new UsageService();
ModulesBuilder modules = new ModulesBuilder();
// plugin modules must be added here, before others or we can get crazy injection errors...
for (Module pluginModule : pluginsService.createGuiceModules()) {
modules.add(pluginModule);
}
final MonitorService monitorService = new MonitorService(settings, nodeEnvironment, threadPool);
final FsHealthService fsHealthService = new FsHealthService(
settings,
clusterService.getClusterSettings(),
threadPool,
nodeEnvironment
);
final SetOnce<RerouteService> rerouteServiceReference = new SetOnce<>();
final InternalSnapshotsInfoService snapshotsInfoService = new InternalSnapshotsInfoService(
settings,
clusterService,
repositoriesServiceReference::get,
rerouteServiceReference::get
);
final ClusterModule clusterModule = new ClusterModule(
settings,
clusterService,
clusterPlugins,
clusterInfoService,
snapshotsInfoService,
threadPool.getThreadContext()
);
modules.add(clusterModule);
IndicesModule indicesModule = new IndicesModule(pluginsService.filterPlugins(MapperPlugin.class));
modules.add(indicesModule);
SearchModule searchModule = new SearchModule(settings, pluginsService.filterPlugins(SearchPlugin.class));
List<BreakerSettings> pluginCircuitBreakers = pluginsService.filterPlugins(CircuitBreakerPlugin.class)
.stream()
.map(plugin -> plugin.getCircuitBreaker(settings))
.collect(Collectors.toList());
final CircuitBreakerService circuitBreakerService = createCircuitBreakerService(
settingsModule.getSettings(),
pluginCircuitBreakers,
settingsModule.getClusterSettings()
);
pluginsService.filterPlugins(CircuitBreakerPlugin.class).forEach(plugin -> {
CircuitBreaker breaker = circuitBreakerService.getBreaker(plugin.getCircuitBreaker(settings).getName());
plugin.setCircuitBreaker(breaker);
});
resourcesToClose.add(circuitBreakerService);
modules.add(new GatewayModule());
PageCacheRecycler pageCacheRecycler = createPageCacheRecycler(settings);
BigArrays bigArrays = createBigArrays(pageCacheRecycler, circuitBreakerService);
modules.add(settingsModule);
List<NamedWriteableRegistry.Entry> namedWriteables = Stream.of(
NetworkModule.getNamedWriteables().stream(),
indicesModule.getNamedWriteables().stream(),
searchModule.getNamedWriteables().stream(),
pluginsService.filterPlugins(Plugin.class).stream().flatMap(p -> p.getNamedWriteables().stream()),
ClusterModule.getNamedWriteables().stream()
).flatMap(Function.identity()).collect(Collectors.toList());
final NamedWriteableRegistry namedWriteableRegistry = new NamedWriteableRegistry(namedWriteables);
NamedXContentRegistry xContentRegistry = new NamedXContentRegistry(
Stream.of(
NetworkModule.getNamedXContents().stream(),
IndicesModule.getNamedXContents().stream(),
searchModule.getNamedXContents().stream(),
pluginsService.filterPlugins(Plugin.class).stream().flatMap(p -> p.getNamedXContent().stream()),
ClusterModule.getNamedXWriteables().stream()
).flatMap(Function.identity()).collect(toList())
);
final MetaStateService metaStateService = new MetaStateService(nodeEnvironment, xContentRegistry);
final PersistedClusterStateService lucenePersistedStateFactory = new PersistedClusterStateService(
nodeEnvironment,
xContentRegistry,
bigArrays,
clusterService.getClusterSettings(),
threadPool::relativeTimeInMillis
);
// collect engine factory providers from plugins
final Collection<EnginePlugin> enginePlugins = pluginsService.filterPlugins(EnginePlugin.class);
final Collection<Function<IndexSettings, Optional<EngineFactory>>> engineFactoryProviders = enginePlugins.stream()
.map(plugin -> (Function<IndexSettings, Optional<EngineFactory>>) plugin::getEngineFactory)
.collect(Collectors.toList());
final Map<String, IndexStorePlugin.DirectoryFactory> indexStoreFactories = pluginsService.filterPlugins(IndexStorePlugin.class)
.stream()
.map(IndexStorePlugin::getDirectoryFactories)
.flatMap(m -> m.entrySet().stream())
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
final Map<String, IndexStorePlugin.RecoveryStateFactory> recoveryStateFactories = pluginsService.filterPlugins(
IndexStorePlugin.class
)
.stream()
.map(IndexStorePlugin::getRecoveryStateFactories)
.flatMap(m -> m.entrySet().stream())
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
final Map<String, Collection<SystemIndexDescriptor>> systemIndexDescriptorMap = Collections.unmodifiableMap(
pluginsService.filterPlugins(SystemIndexPlugin.class)
.stream()
.collect(
Collectors.toMap(plugin -> plugin.getClass().getSimpleName(), plugin -> plugin.getSystemIndexDescriptors(settings))
)
);
final SystemIndices systemIndices = new SystemIndices(systemIndexDescriptorMap);
final RerouteService rerouteService = new BatchedRerouteService(clusterService, clusterModule.getAllocationService()::reroute);
rerouteServiceReference.set(rerouteService);
clusterService.setRerouteService(rerouteService);
final IndicesService indicesService = new IndicesService(
settings,
pluginsService,
nodeEnvironment,
xContentRegistry,
analysisModule.getAnalysisRegistry(),
clusterModule.getIndexNameExpressionResolver(),
indicesModule.getMapperRegistry(),
namedWriteableRegistry,
threadPool,
settingsModule.getIndexScopedSettings(),
circuitBreakerService,
bigArrays,
scriptService,
clusterService,
client,
metaStateService,
engineFactoryProviders,
indexStoreFactories,
searchModule.getValuesSourceRegistry(),
recoveryStateFactories
);
final AliasValidator aliasValidator = new AliasValidator();
final ShardLimitValidator shardLimitValidator = new ShardLimitValidator(settings, clusterService);
final MetadataCreateIndexService metadataCreateIndexService = new MetadataCreateIndexService(
settings,
clusterService,
indicesService,
clusterModule.getAllocationService(),
aliasValidator,
shardLimitValidator,
environment,
settingsModule.getIndexScopedSettings(),
threadPool,
xContentRegistry,
systemIndices,
forbidPrivateIndexSettings
);
pluginsService.filterPlugins(Plugin.class)
.forEach(
p -> p.getAdditionalIndexSettingProviders().forEach(metadataCreateIndexService::addAdditionalIndexSettingProvider)
);
final MetadataCreateDataStreamService metadataCreateDataStreamService = new MetadataCreateDataStreamService(
threadPool,
clusterService,
metadataCreateIndexService
);
Collection<Object> pluginComponents = pluginsService.filterPlugins(Plugin.class)
.stream()
.flatMap(
p -> p.createComponents(
client,
clusterService,
threadPool,
resourceWatcherService,
scriptService,
xContentRegistry,
environment,
nodeEnvironment,
namedWriteableRegistry,
clusterModule.getIndexNameExpressionResolver(),
repositoriesServiceReference::get
).stream()
)
.collect(Collectors.toList());
ActionModule actionModule = new ActionModule(
settings,
clusterModule.getIndexNameExpressionResolver(),
settingsModule.getIndexScopedSettings(),
settingsModule.getClusterSettings(),
settingsModule.getSettingsFilter(),
threadPool,
pluginsService.filterPlugins(ActionPlugin.class),
client,
circuitBreakerService,
usageService,
systemIndices
);
modules.add(actionModule);
final RestController restController = actionModule.getRestController();
final NetworkModule networkModule = new NetworkModule(
settings,
pluginsService.filterPlugins(NetworkPlugin.class),
threadPool,
bigArrays,
pageCacheRecycler,
circuitBreakerService,
namedWriteableRegistry,
xContentRegistry,
networkService,
restController,
clusterService.getClusterSettings()
);
Collection<UnaryOperator<Map<String, IndexTemplateMetadata>>> indexTemplateMetadataUpgraders = pluginsService.filterPlugins(
Plugin.class
).stream().map(Plugin::getIndexTemplateMetadataUpgrader).collect(Collectors.toList());
final MetadataUpgrader metadataUpgrader = new MetadataUpgrader(indexTemplateMetadataUpgraders);
final MetadataIndexUpgradeService metadataIndexUpgradeService = new MetadataIndexUpgradeService(
settings,
xContentRegistry,
indicesModule.getMapperRegistry(),
settingsModule.getIndexScopedSettings(),
systemIndices,
scriptService
);
if (DiscoveryNode.isMasterNode(settings)) {
clusterService.addListener(new SystemIndexMetadataUpgradeService(systemIndices, clusterService));
}
new TemplateUpgradeService(client, clusterService, threadPool, indexTemplateMetadataUpgraders);
final Transport transport = networkModule.getTransportSupplier().get();
Set<String> taskHeaders = Stream.concat(
pluginsService.filterPlugins(ActionPlugin.class).stream().flatMap(p -> p.getTaskHeaders().stream()),
Stream.of(Task.X_OPAQUE_ID)
).collect(Collectors.toSet());
final TransportService transportService = newTransportService(
settings,
transport,
threadPool,
networkModule.getTransportInterceptor(),
localNodeFactory,
settingsModule.getClusterSettings(),
taskHeaders
);
final GatewayMetaState gatewayMetaState = new GatewayMetaState();
final ResponseCollectorService responseCollectorService = new ResponseCollectorService(clusterService);
final SearchTransportService searchTransportService = new SearchTransportService(
transportService,
SearchExecutionStatsCollector.makeWrapper(responseCollectorService)
);
final HttpServerTransport httpServerTransport = newHttpTransport(networkModule);
final IndexingPressureService indexingPressureService = new IndexingPressureService(settings, clusterService);
// Going forward, IndexingPressureService will have required constructs for exposing listeners/interfaces for plugin
// development. Then we can deprecate Getter and Setter for IndexingPressureService in ClusterService (#478).
clusterService.setIndexingPressureService(indexingPressureService);
final RecoverySettings recoverySettings = new RecoverySettings(settings, settingsModule.getClusterSettings());
RepositoriesModule repositoriesModule = new RepositoriesModule(
this.environment,
pluginsService.filterPlugins(RepositoryPlugin.class),
transportService,
clusterService,
threadPool,
xContentRegistry,
recoverySettings
);
RepositoriesService repositoryService = repositoriesModule.getRepositoryService();
repositoriesServiceReference.set(repositoryService);
SnapshotsService snapshotsService = new SnapshotsService(
settings,
clusterService,
clusterModule.getIndexNameExpressionResolver(),
repositoryService,
transportService,
actionModule.getActionFilters()
);
SnapshotShardsService snapshotShardsService = new SnapshotShardsService(
settings,
clusterService,
repositoryService,
transportService,
indicesService
);
TransportNodesSnapshotsStatus nodesSnapshotsStatus = new TransportNodesSnapshotsStatus(
threadPool,
clusterService,
transportService,
snapshotShardsService,
actionModule.getActionFilters()
);
RestoreService restoreService = new RestoreService(
clusterService,
repositoryService,
clusterModule.getAllocationService(),
metadataCreateIndexService,
metadataIndexUpgradeService,
clusterService.getClusterSettings(),
shardLimitValidator
);
final DiskThresholdMonitor diskThresholdMonitor = new DiskThresholdMonitor(
settings,
clusterService::state,
clusterService.getClusterSettings(),
client,
threadPool::relativeTimeInMillis,
rerouteService
);
clusterInfoService.addListener(diskThresholdMonitor::onNewInfo);
final DiscoveryModule discoveryModule = new DiscoveryModule(
settings,
threadPool,
transportService,
namedWriteableRegistry,
networkService,
clusterService.getMasterService(),
clusterService.getClusterApplierService(),
clusterService.getClusterSettings(),
pluginsService.filterPlugins(DiscoveryPlugin.class),
clusterModule.getAllocationService(),
environment.configFile(),
gatewayMetaState,
rerouteService,
fsHealthService
);
this.nodeService = new NodeService(
settings,
threadPool,
monitorService,
discoveryModule.getDiscovery(),
transportService,
indicesService,
pluginsService,
circuitBreakerService,
scriptService,
httpServerTransport,
ingestService,
clusterService,
settingsModule.getSettingsFilter(),
responseCollectorService,
searchTransportService,
indexingPressureService,
searchModule.getValuesSourceRegistry().getUsageService()
);
final SearchService searchService = newSearchService(
clusterService,
indicesService,
threadPool,
scriptService,
bigArrays,
searchModule.getFetchPhase(),
responseCollectorService,
circuitBreakerService
);
final List<PersistentTasksExecutor<?>> tasksExecutors = pluginsService.filterPlugins(PersistentTaskPlugin.class)
.stream()
.map(
p -> p.getPersistentTasksExecutor(
clusterService,
threadPool,
client,
settingsModule,
clusterModule.getIndexNameExpressionResolver()
)
)
.flatMap(List::stream)
.collect(toList());
final PersistentTasksExecutorRegistry registry = new PersistentTasksExecutorRegistry(tasksExecutors);
final PersistentTasksClusterService persistentTasksClusterService = new PersistentTasksClusterService(
settings,
registry,
clusterService,
threadPool
);
resourcesToClose.add(persistentTasksClusterService);
final PersistentTasksService persistentTasksService = new PersistentTasksService(clusterService, threadPool, client);
modules.add(b -> {
b.bind(Node.class).toInstance(this);
b.bind(NodeService.class).toInstance(nodeService);
b.bind(NamedXContentRegistry.class).toInstance(xContentRegistry);
b.bind(PluginsService.class).toInstance(pluginsService);
b.bind(Client.class).toInstance(client);
b.bind(NodeClient.class).toInstance(client);
b.bind(Environment.class).toInstance(this.environment);
b.bind(ThreadPool.class).toInstance(threadPool);
b.bind(NodeEnvironment.class).toInstance(nodeEnvironment);
b.bind(ResourceWatcherService.class).toInstance(resourceWatcherService);
b.bind(CircuitBreakerService.class).toInstance(circuitBreakerService);
b.bind(BigArrays.class).toInstance(bigArrays);
b.bind(PageCacheRecycler.class).toInstance(pageCacheRecycler);
b.bind(ScriptService.class).toInstance(scriptService);
b.bind(AnalysisRegistry.class).toInstance(analysisModule.getAnalysisRegistry());
b.bind(IngestService.class).toInstance(ingestService);
b.bind(IndexingPressureService.class).toInstance(indexingPressureService);
b.bind(UsageService.class).toInstance(usageService);
b.bind(AggregationUsageService.class).toInstance(searchModule.getValuesSourceRegistry().getUsageService());
b.bind(NamedWriteableRegistry.class).toInstance(namedWriteableRegistry);
b.bind(MetadataUpgrader.class).toInstance(metadataUpgrader);
b.bind(MetaStateService.class).toInstance(metaStateService);
b.bind(PersistedClusterStateService.class).toInstance(lucenePersistedStateFactory);
b.bind(IndicesService.class).toInstance(indicesService);
b.bind(AliasValidator.class).toInstance(aliasValidator);
b.bind(MetadataCreateIndexService.class).toInstance(metadataCreateIndexService);
b.bind(MetadataCreateDataStreamService.class).toInstance(metadataCreateDataStreamService);
b.bind(SearchService.class).toInstance(searchService);
b.bind(SearchTransportService.class).toInstance(searchTransportService);
b.bind(SearchPhaseController.class)
.toInstance(new SearchPhaseController(namedWriteableRegistry, searchService::aggReduceContextBuilder));
b.bind(Transport.class).toInstance(transport);
b.bind(TransportService.class).toInstance(transportService);
b.bind(NetworkService.class).toInstance(networkService);
b.bind(UpdateHelper.class).toInstance(new UpdateHelper(scriptService));
b.bind(MetadataIndexUpgradeService.class).toInstance(metadataIndexUpgradeService);
b.bind(ClusterInfoService.class).toInstance(clusterInfoService);
b.bind(SnapshotsInfoService.class).toInstance(snapshotsInfoService);
b.bind(GatewayMetaState.class).toInstance(gatewayMetaState);
b.bind(Discovery.class).toInstance(discoveryModule.getDiscovery());
{
processRecoverySettings(settingsModule.getClusterSettings(), recoverySettings);
b.bind(PeerRecoverySourceService.class)
.toInstance(new PeerRecoverySourceService(transportService, indicesService, recoverySettings));
b.bind(PeerRecoveryTargetService.class)
.toInstance(new PeerRecoveryTargetService(threadPool, transportService, recoverySettings, clusterService));
}
b.bind(HttpServerTransport.class).toInstance(httpServerTransport);
pluginComponents.stream().forEach(p -> b.bind((Class) p.getClass()).toInstance(p));
b.bind(PersistentTasksService.class).toInstance(persistentTasksService);
b.bind(PersistentTasksClusterService.class).toInstance(persistentTasksClusterService);
b.bind(PersistentTasksExecutorRegistry.class).toInstance(registry);
b.bind(RepositoriesService.class).toInstance(repositoryService);
b.bind(SnapshotsService.class).toInstance(snapshotsService);
b.bind(SnapshotShardsService.class).toInstance(snapshotShardsService);
b.bind(TransportNodesSnapshotsStatus.class).toInstance(nodesSnapshotsStatus);
b.bind(RestoreService.class).toInstance(restoreService);
b.bind(RerouteService.class).toInstance(rerouteService);
b.bind(ShardLimitValidator.class).toInstance(shardLimitValidator);
b.bind(FsHealthService.class).toInstance(fsHealthService);
b.bind(SystemIndices.class).toInstance(systemIndices);
});
injector = modules.createInjector();
// We allocate copies of existing shards by looking for a viable copy of the shard in the cluster and assigning the shard there.
// The search for viable copies is triggered by an allocation attempt (i.e. a reroute) and is performed asynchronously. When it
// completes we trigger another reroute to try the allocation again. This means there is a circular dependency: the allocation
// service needs access to the existing shards allocators (e.g. the GatewayAllocator) which need to be able to trigger a
// reroute, which needs to call into the allocation service. We close the loop here:
clusterModule.setExistingShardsAllocators(injector.getInstance(GatewayAllocator.class));
List<LifecycleComponent> pluginLifecycleComponents = pluginComponents.stream()
.filter(p -> p instanceof LifecycleComponent)
.map(p -> (LifecycleComponent) p)
.collect(Collectors.toList());
pluginLifecycleComponents.addAll(
pluginsService.getGuiceServiceClasses().stream().map(injector::getInstance).collect(Collectors.toList())
);
resourcesToClose.addAll(pluginLifecycleComponents);
resourcesToClose.add(injector.getInstance(PeerRecoverySourceService.class));
this.pluginLifecycleComponents = Collections.unmodifiableList(pluginLifecycleComponents);
client.initialize(injector.getInstance(new Key<Map<ActionType, TransportAction>>() {
}), () -> clusterService.localNode().getId(), transportService.getRemoteClusterService(), namedWriteableRegistry);
this.namedWriteableRegistry = namedWriteableRegistry;
logger.debug("initializing HTTP handlers ...");
actionModule.initRestHandlers(() -> clusterService.state().nodes());
logger.info("initialized");
success = true;
} catch (IOException ex) {
throw new OpenSearchException("failed to bind service", ex);
} finally {
if (!success) {
IOUtils.closeWhileHandlingException(resourcesToClose);
}
}
}