in gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceGuiceModule.java [109:238]
public void configure(Binder binder) {
LOGGER.info("Configuring bindings for the following service settings: {}", serviceConfig);
// In the current code base, we frequently inject classes instead of interfaces
// As a result, even when the binding is missing, Guice will create an instance of the
// the class and inject it. This interferes with disabling of different services and
// components, because without explicit bindings they will get instantiated anyway.
binder.requireExplicitBindings();
// Optional binder will find the existing binding for T and create additional binding for Optional<T>.
// If none of the specific class binding exist, optional will be "absent".
OptionalBinder.newOptionalBinder(binder, Logger.class);
binder.bind(Logger.class).toInstance(LoggerFactory.getLogger(GobblinServiceManager.class));
binder.bind(Config.class).toInstance(serviceConfig.getInnerConfig());
binder.bind(GobblinServiceConfiguration.class).toInstance(serviceConfig);
// Used by TopologyCatalog and FlowCatalog
GobblinInstanceEnvironment gobblinInstanceEnvironment = StandardGobblinInstanceLauncher.builder()
.withLog(LoggerFactory.getLogger(GobblinServiceManager.class))
.setInstrumentationEnabled(true)
.withSysConfig(serviceConfig.getInnerConfig())
.build();
binder.bind(GobblinInstanceEnvironment.class).toInstance(gobblinInstanceEnvironment);
binder.bindConstant().annotatedWith(Names.named(InjectionNames.SERVICE_NAME)).to(serviceConfig.getServiceName());
binder.bind(FlowConfigsV2Resource.class);
binder.bind(FlowStatusResource.class);
binder.bind(FlowExecutionResource.class);
binder.bind(FlowConfigsV2ResourceHandler.class);
binder.bind(FlowExecutionResourceHandler.class);
binder.bind(FlowExecutionResourceHandlerInterface.class).to(FlowExecutionResourceHandler.class);
/* Note that two instances of the same class can only be differentiated with an `annotatedWith` marker provided at
binding time (optionally bound classes cannot have names associated with them), so both arbiters need to be
explicitly bound to be differentiated. The scheduler lease arbiter is only used in single-active scheduler mode,
while the execution lease arbiter is used in single-active or multi-active execution. */
binder.bind(MultiActiveLeaseArbiter.class).annotatedWith(Names.named(
ConfigurationKeys.SCHEDULER_LEASE_ARBITER_NAME)).toProvider(
FlowLaunchMultiActiveLeaseArbiterFactory.class);
binder.bind(MultiActiveLeaseArbiter.class).annotatedWith(Names.named(
ConfigurationKeys.PROCESSING_LEASE_ARBITER_NAME)).toProvider(
DagActionProcessingMultiActiveLeaseArbiterFactory.class);
binder.bind(DagActionReminderScheduler.class);
binder.bind(DagActionStore.class).to(MysqlDagActionStore.class);
binder.bind(DagManagementDagActionStoreChangeMonitor.class).toProvider(
DagManagementDagActionStoreChangeMonitorFactory.class).in(Singleton.class);
binder.bind(DagManagement.class).to(DagManagementTaskStreamImpl.class);
binder.bind(DagManagementStateStore.class).to(MySqlDagManagementStateStore.class);
binder.bind(DagTaskStream.class).to(DagManagementTaskStreamImpl.class);
binder.bind(DagProcFactory.class);
binder.bind(DagProcessingEngine.class);
binder.bind(DagProcessingEngineMetrics.class);
binder.bind(FlowLaunchHandler.class);
binder.bind(MultiActiveLeaseArbiter.class).toProvider(DagActionProcessingMultiActiveLeaseArbiterFactory.class);
binder.bind(SpecStoreChangeMonitor.class).toProvider(SpecStoreChangeMonitorFactory.class).in(Singleton.class);
binder.bind(RequesterService.class).to(NoopRequesterService.class);
binder.bind(SharedFlowMetricsSingleton.class);
binder.bind(FlowCompilationValidationHelper.class);
binder.bind(TopologyCatalog.class);
if (serviceConfig.isTopologySpecFactoryEnabled()) {
binder.bind(TopologySpecFactory.class)
.to(getClassByNameOrAlias(TopologySpecFactory.class, serviceConfig.getInnerConfig(),
ServiceConfigKeys.TOPOLOGYSPEC_FACTORY_KEY, ServiceConfigKeys.DEFAULT_TOPOLOGY_SPEC_FACTORY));
}
OptionalBinder.newOptionalBinder(binder, FlowCatalog.class);
if (serviceConfig.isFlowCatalogEnabled()) {
binder.bind(FlowCatalog.class);
}
if (serviceConfig.isJobStatusMonitorEnabled()) {
binder.bind(KafkaJobStatusMonitor.class).toProvider(KafkaJobStatusMonitorFactory.class).in(Singleton.class);
}
binder.bind(FlowStatusGenerator.class);
if (serviceConfig.isSchedulerEnabled()) {
binder.bind(Orchestrator.class);
binder.bind(SchedulerService.class);
binder.bind(GobblinServiceJobScheduler.class);
binder.bind(UserQuotaManager.class)
.to(getClassByNameOrAlias(UserQuotaManager.class, serviceConfig.getInnerConfig(),
ServiceConfigKeys.QUOTA_MANAGER_CLASS, ServiceConfigKeys.DEFAULT_QUOTA_MANAGER));
}
if (serviceConfig.isGitConfigMonitorEnabled()) {
binder.bind(GitConfigMonitor.class);
}
binder.bind(GroupOwnershipService.class)
.to(getClassByNameOrAlias(GroupOwnershipService.class, serviceConfig.getInnerConfig(),
ServiceConfigKeys.GROUP_OWNERSHIP_SERVICE_CLASS, ServiceConfigKeys.DEFAULT_GROUP_OWNERSHIP_SERVICE));
binder.bind(JobStatusRetriever.class)
.to(getClassByNameOrAlias(JobStatusRetriever.class, serviceConfig.getInnerConfig(),
JOB_STATUS_RETRIEVER_CLASS_KEY, FsJobStatusRetriever.class.getName()));
if (serviceConfig.isRestLIServerEnabled()) {
binder.bind(EmbeddedRestliServer.class).toProvider(EmbeddedRestliServerProvider.class);
}
binder.bind(GobblinServiceManager.class);
binder.bind(ServiceDatabaseProvider.class).to(ServiceDatabaseProviderImpl.class);
binder.bind(ServiceDatabaseProviderImpl.Configuration.class);
binder.bind(ServiceDatabaseManager.class);
binder.bind(MultiContextIssueRepository.class)
.to(getClassByNameOrAlias(MultiContextIssueRepository.class, serviceConfig.getInnerConfig(),
ServiceConfigKeys.ISSUE_REPO_CLASS,
InMemoryMultiContextIssueRepository.class.getName()));
binder.bind(MySqlMultiContextIssueRepository.Configuration.class);
binder.bind(InMemoryMultiContextIssueRepository.Configuration.class);
binder.bind(JobIssueEventHandler.class);
binder.bind(D2Announcer.class).to(NoopD2Announcer.class);
binder.bindConstant().annotatedWith(Names.named(DagProcessingEngine.DEFAULT_JOB_START_DEADLINE_TIME_MS)).to(
DagProcUtils.getDefaultJobStartDeadline(serviceConfig.getInnerConfig()));
LOGGER.info("Bindings configured");
}