controller/tasks/Module.go (48 lines of code) (raw):

package tasks import ( "context" "github.com/uber-go/tally" "github.com/uber/aresdb/cluster/kvstore" mutatorCom "github.com/uber/aresdb/controller/mutators/common" taskCom "github.com/uber/aresdb/controller/tasks/common" "github.com/uber/aresdb/controller/tasks/etcd" "go.uber.org/config" "go.uber.org/fx" "go.uber.org/zap" ) // EtcdTaskParams wraps etcd params with fx type EtcdTaskParams struct { fx.In ConfigProvider config.Provider Logger *zap.SugaredLogger Scope tally.Scope EtcdClient *kvstore.EtcdClient NamespaceMutator mutatorCom.NamespaceMutator JobMutator mutatorCom.JobMutator SchemaMutator mutatorCom.TableSchemaMutator SubscriberMutator mutatorCom.SubscriberMutator AssignmentsMutator mutatorCom.IngestionAssignmentMutator } // InvokeEtcdTask invoke etcd based ingestion assignment task func InvokeEtcdTask(params EtcdTaskParams, Lifecycle fx.Lifecycle) { if params.EtcdClient != nil { params.Logger.Info("initializing ingestion assignment task based on etcd") task := etcd.NewIngestionAssignmentTask(taskCom.IngestionAssignmentTaskParams{ ConfigProvider: params.ConfigProvider, Logger: params.Logger, Scope: params.Scope, EtcdClient: params.EtcdClient, NamespaceMutator: params.NamespaceMutator, SubscriberMutator: params.SubscriberMutator, JobMutator: params.JobMutator, SchemaMutator: params.SchemaMutator, AssignmentsMutator: params.AssignmentsMutator, }) go task.Run() Lifecycle.Append(fx.Hook{ OnStop: func(context.Context) error { task.Done() return nil }, }) } } var Module = fx.Invoke(InvokeEtcdTask)