subscriber/common/job/controller.go (435 lines of code) (raw):

// Copyright (c) 2017-2018 Uber Technologies, Inc. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. package job import ( "context" "encoding/json" "fmt" "net/http" "strings" "sync" "time" "os" "github.com/curator-go/curator" "github.com/m3db/m3/src/cluster/placement" "github.com/m3db/m3/src/cluster/services" "github.com/m3db/m3/src/x/instrument" controllerCli "github.com/uber/aresdb/controller/client" "github.com/uber/aresdb/subscriber/common/rules" "github.com/uber/aresdb/subscriber/config" "go.uber.org/fx" "go.uber.org/zap" ) // Module configures Drivers and Controller. var Module = fx.Options( fx.Provide( NewController, ), fx.Invoke(StartController), ) // Params defines the base objects for jobConfigs. type Params struct { fx.In LifeCycle fx.Lifecycle ServiceConfig config.ServiceConfig JobConfigs rules.JobConfigs SinkInitFunc NewSink ConsumerInitFunc NewConsumer DecoderInitFunc NewDecoder } // Result defines the objects that the job module provides. type Result struct { fx.Out Controller *Controller } const ( // defaultRefreshInterval is 10 minutes defaultRefreshInterval = 10 ) // Controller is responsible for syncing up with aresDB control type Controller struct { sync.RWMutex serviceConfig config.ServiceConfig // aresControllerClient is aresDB controller client aresControllerClient controllerCli.ControllerClient // Drivers are all running jobs Drivers Drivers // jobNS is current active job namespace jobNS string // aresClusterNS is current active aresDB cluster namespace aresClusterNS string // assignmentHashCode is current assignment hash code assignmentHashCode string // zkClient is zookeeper client zkClient curator.CuratorFramework // etcdServices is etcd services client etcdServices services.Services // sinkInitFunc is func of NewSink sinkInitFunc NewSink // consumerInitFunc is func of NewConsumer consumerInitFunc NewConsumer // decoderInitFunc is func of NewDecoder decoderInitFunc NewDecoder } // ZKNodeSubscriber defines the information stored in ZKNode subscriber type ZKNodeSubscriber struct { // Name is subscriber instanceId Name string `json:"name"` // Host is host name of subscriber Host string `json:"host"` } // NewController creates controller func NewController(params Params) *Controller { params.ServiceConfig.Logger.Info("Creating Controller") aresControllerClient := controllerCli.NewControllerHTTPClient(params.ServiceConfig.ControllerConfig.Address, time.Duration(params.ServiceConfig.ControllerConfig.Timeout)*time.Second, http.Header{ "RPC-Caller": []string{os.Getenv("UDEPLOY_APP_ID")}, "RPC-Service": []string{params.ServiceConfig.ControllerConfig.ServiceName}, }) aresControllerClient.SetNamespace(config.ActiveJobNameSpace) drivers, err := NewDrivers(params, aresControllerClient) if err != nil { params.ServiceConfig.Logger.Panic("Failed to NewDrivers", zap.Error(err)) } if params.ServiceConfig.ControllerConfig.RefreshInterval <= 0 { params.ServiceConfig.Logger.Info("Reset controller refreshInterval", zap.Int("from", params.ServiceConfig.ControllerConfig.RefreshInterval), zap.Int("to", defaultRefreshInterval)) params.ServiceConfig.ControllerConfig.RefreshInterval = defaultRefreshInterval } controller := &Controller{ serviceConfig: params.ServiceConfig, aresControllerClient: aresControllerClient, Drivers: drivers, jobNS: config.ActiveJobNameSpace, aresClusterNS: config.ActiveAresNameSpace, assignmentHashCode: "", sinkInitFunc: params.SinkInitFunc, consumerInitFunc: params.ConsumerInitFunc, decoderInitFunc: params.DecoderInitFunc, } if params.ServiceConfig.ControllerConfig.Enable { params.ServiceConfig.Logger.Info("aresDB Controller is enabled") if params.ServiceConfig.HeartbeatConfig != nil && params.ServiceConfig.HeartbeatConfig.Enabled { controller.startEtcdHBService(params) go controller.RestartEtcdHBService(params) } else { controller.zkClient = createZKClient(params) err = controller.zkClient.Start() if err != nil { params.ServiceConfig.Logger.Panic("Failed to start zkClient", zap.Error(err)) } params.ServiceConfig.Logger.Info("zkClient was started") err = controller.RegisterOnZK() if err != nil { params.ServiceConfig.Logger.Panic("Failed to register subscriber", zap.Error(err)) } params.ServiceConfig.Logger.Info("Registered subscriber in zk") } go controller.SyncUpJobConfigs() } params.ServiceConfig.Logger.Info("Controller created", zap.Any("controller", controller)) return controller } func connectEtcdServices(params Params) (services.Services, error) { iopts := instrument.NewOptions(). SetLogger(params.ServiceConfig.Logger). SetMetricsScope(params.ServiceConfig.Scope) etcdConfig := &params.ServiceConfig.EtcdConfig.EtcdConfig // create a config service client to access to the etcd cluster services. csClient, err := etcdConfig.NewClient(iopts) if err != nil { params.ServiceConfig.Logger.Error("Failed to NewClient for etcd", zap.Error(err)) return nil, err } servicesClient, err := csClient.Services(nil) if err != nil { params.ServiceConfig.Logger.Error("Failed to create services for etcd", zap.Error(err)) return nil, err } return servicesClient, nil } func registerHeartBeatService(params Params, servicesClient services.Services) error { sid := services.NewServiceID(). SetEnvironment(params.ServiceConfig.EtcdConfig.EtcdConfig.Env). SetZone(params.ServiceConfig.EtcdConfig.EtcdConfig.Zone). SetName(params.ServiceConfig.EtcdConfig.EtcdConfig.Service) err := servicesClient.SetMetadata(sid, services.NewMetadata(). SetHeartbeatInterval(time.Duration(params.ServiceConfig.HeartbeatConfig.Interval)*time.Second). SetLivenessInterval(time.Duration(params.ServiceConfig.HeartbeatConfig.Timeout)*time.Second)) if err != nil { params.ServiceConfig.Logger.Error("Failed to config heartbeart", zap.Error(err)) return err } pInstance := placement.NewInstance(). SetID(params.ServiceConfig.Environment.InstanceID) ad := services.NewAdvertisement(). SetServiceID(sid). SetPlacementInstance(pInstance) params.ServiceConfig.Logger.Info("service, placement, and ad info", zap.Any("serviceID", sid), zap.Any("placement", pInstance), zap.Any("ad", ad)) err = servicesClient.Advertise(ad) if err != nil { params.ServiceConfig.Logger.Error("Failed to advertise heartbeat service", zap.Error(err)) } else { params.ServiceConfig.Logger.Info("advertised heartbeat") } return err } func createZKClient(params Params) curator.CuratorFramework { zkConfig := params.ServiceConfig.ZooKeeperConfig lc := params.LifeCycle retryPolicy := curator.NewExponentialBackoffRetry( zkConfig.BaseSleepTimeSeconds*time.Second, zkConfig.MaxRetries, zkConfig.MaxSleepSeconds*time.Second) // Using the CuratorFrameworkBuilder gives fine grained control over creation options builder := &curator.CuratorFrameworkBuilder{ ConnectionTimeout: zkConfig.ConnectionTimeoutSeconds * time.Second, SessionTimeout: zkConfig.SessionTimeoutSeconds * time.Second, RetryPolicy: retryPolicy, } zkClient := builder.ConnectString(zkConfig.Server).Build() lc.Append(fx.Hook{ OnStop: func(ctx context.Context) error { params.ServiceConfig.Logger.Info("Close zkClient") return zkClient.Close() }, }) return zkClient } // RegisterOnZK registes aresDB subscriber instance in zookeeper as an ephemeral node func (c *Controller) RegisterOnZK() error { path := fmt.Sprintf("/ares_controller/%s/subscribers/%s", c.jobNS, c.serviceConfig.Environment.InstanceID) subscriber, err := json.Marshal(ZKNodeSubscriber{ Name: c.serviceConfig.Environment.InstanceID, Host: c.serviceConfig.Environment.Hostname, }) if err != nil { return err } _, err = c.zkClient.Create().WithMode(curator.EPHEMERAL).ForPathWithData(path, subscriber) time.Sleep(time.Minute * 1) return err } // SyncUpJobConfigs sync up jobConfigs with aresDB controller func (c *Controller) SyncUpJobConfigs() { c.Lock() defer c.Unlock() // Check if the hash of the assignment is changed or not updateHash, newAssignmentHash := c.updateAssignmentHash() if !updateHash { c.serviceConfig.Scope.Counter("syncUp.skipped").Inc(1) return } // Get assignment from aresDB controller since hash is changed assigned, err := c.aresControllerClient.GetAssignment(c.jobNS, c.serviceConfig.Environment.InstanceID) if err != nil { c.serviceConfig.Logger.Error("Failed to get assignment from aresDB controller", zap.String("jobNamespace", c.jobNS), zap.String("aresDB Controller", c.serviceConfig.ControllerConfig.Address), zap.Error(err)) c.serviceConfig.Scope.Counter("syncUp.failed").Inc(1) return } assignment, err := rules.NewAssignmentFromController(assigned) if err != nil { c.serviceConfig.Logger.Error("Failed to populate assignment from controller assignment", zap.String("jobNamespace", c.jobNS), zap.String("aresDB Controller", c.serviceConfig.ControllerConfig.Address), zap.Error(err)) c.serviceConfig.Scope.Counter("syncUp.failed").Inc(1) return } c.serviceConfig.Logger.Info("Got assignment from aresDB controller", zap.String("jobNamespace", c.jobNS), zap.String("aresDB Controller", c.serviceConfig.ControllerConfig.Address), zap.String("activeAresNameSpace", config.ActiveAresNameSpace), zap.Any("aresClusterNSConfig", c.serviceConfig.AresNSConfig), zap.Any("activeAresClusters", c.serviceConfig.ActiveAresClusters), zap.Any("assignement", assignment)) newJobs := make(map[string]*rules.JobConfig) // Add or Update jobs for _, jobConfig := range assignment.Jobs { newJobs[jobConfig.Name] = jobConfig if aresClusterDrivers, ok := c.Drivers[jobConfig.Name]; ok { // case1: existing jobConfig for aresCluster, driver := range aresClusterDrivers { if _, ok := assignment.AresClusters[aresCluster]; !ok { // case1.1: delete the driver because aresCluster is deleted activeAresCluster, exist := c.serviceConfig.ActiveAresClusters[aresCluster] if exist && activeAresCluster.GetSinkMode() != config.Sink_Kafka { c.deleteDriver(driver, aresCluster, aresClusterDrivers) c.serviceConfig.Logger.Info("deleted driver due to the removed aresCluster", zap.String("job", jobConfig.Name), zap.String("aresCluster", aresCluster)) } continue } if driver.jobConfig.Version != jobConfig.Version { // case1.2: restart the driver because jobConfig version is changed, if !c.addDriver(jobConfig, aresCluster, aresClusterDrivers, true) { updateHash = false c.serviceConfig.Logger.Info("restarted driver due to version changes", zap.String("job", jobConfig.Name), zap.String("aresCluster", aresCluster)) } } } for aresCluster, aresClusterObj := range assignment.AresClusters { if _, ok := aresClusterDrivers[aresCluster]; !ok { // case1.3 add a new driver because a new aresCluster is added c.serviceConfig.ActiveAresClusters[aresCluster] = aresClusterObj if !c.addDriver(jobConfig, aresCluster, aresClusterDrivers, false) { updateHash = false c.serviceConfig.Logger.Info("added driver due to the new aresCluster", zap.String("job", jobConfig.Name), zap.String("aresCluster", aresCluster)) } } } } else { // case2: a new jobConfig aresClusterDrivers := make(map[string]*Driver) if len(assignment.AresClusters) != 0 { for aresCluster, aresClusterObj := range assignment.AresClusters { // case2.1: add a new driver for each aresCluster c.serviceConfig.ActiveAresClusters[aresCluster] = aresClusterObj if !c.addDriver(jobConfig, aresCluster, aresClusterDrivers, false) { updateHash = false c.serviceConfig.Logger.Info("added driver (aresDB sink) due to the new job", zap.String("job", jobConfig.Name), zap.String("aresCluster", aresCluster)) } } } else { for aresCluster, aresClusterObj := range c.serviceConfig.ActiveAresClusters { if aresClusterObj.GetSinkMode() == config.Sink_Kafka { if !c.addDriver(jobConfig, aresCluster, aresClusterDrivers, false) { updateHash = false c.serviceConfig.Logger.Info("added driver (kafka sink) due to the new job", zap.String("job", jobConfig.Name), zap.String("aresCluster", aresCluster)) } } else { c.serviceConfig.Logger.Error("missing aresDB instance in assignment", zap.String("job", jobConfig.Name), zap.String("aresCluster", aresCluster)) } } } c.Drivers[jobConfig.Name] = aresClusterDrivers for aresCluster, aresClusterObj := range c.serviceConfig.ActiveAresClusters { // case2.2: delete the aresCluster from ActiveAresClusters because it is deleted from assignment if _, ok := assignment.AresClusters[aresCluster]; !ok && aresClusterObj.GetSinkMode() != config.Sink_Kafka { delete(c.serviceConfig.ActiveAresClusters, aresCluster) } } } } // Delete jobs for jobName, aresClusterDrivers := range c.Drivers { if _, ok := newJobs[jobName]; !ok { // case3: jobConfig is deleted for aresCluster, driver := range aresClusterDrivers { c.deleteDriver(driver, aresCluster, aresClusterDrivers) } c.Drivers[jobName] = nil delete(c.Drivers, jobName) c.serviceConfig.Logger.Info("deleted all drivers", zap.String("job", jobName)) } } // Update local hash codes if updateHash { c.serviceConfig.Logger.Info("Update assignment hash", zap.String("jobNamespace", c.jobNS), zap.String("aresDB Controller", c.serviceConfig.ControllerConfig.Address), zap.String("oldHash", c.assignmentHashCode), zap.String("newHash", newAssignmentHash)) c.assignmentHashCode = newAssignmentHash c.serviceConfig.Scope.Counter("syncUp.succeeded").Inc(1) } c.serviceConfig.Scope.Counter("syncUp.failed").Inc(1) return } func (c *Controller) updateAssignmentHash() (update bool, newHash string) { // get the hash of the assignment oldHash := c.assignmentHashCode newHash, err := c.aresControllerClient.GetAssignmentHash(c.jobNS, c.serviceConfig.Environment.InstanceID) if err != nil { c.serviceConfig.Logger.Error("Failed to get assignment hash from aresDB controller", zap.String("jobNamespace", c.jobNS), zap.String("aresDB Controller", c.serviceConfig.ControllerConfig.Address), zap.Error(err)) return false, "" } if strings.Compare(oldHash, newHash) == 0 { return false, newHash } c.serviceConfig.Logger.Info("Found assignment hash changed", zap.String("jobNamespace", c.jobNS), zap.String("aresDB Controller", c.serviceConfig.ControllerConfig.Address)) return true, newHash } func (c *Controller) addDriver( jobConfig *rules.JobConfig, aresCluster string, aresClusterDrivers map[string]*Driver, stop bool) bool { if !c.startDriver(jobConfig, aresCluster, aresClusterDrivers, false) { c.serviceConfig.Scope.Tagged(map[string]string{ "job": jobConfig.Name, "aresCluster": aresCluster, }).Counter("errors.driver.new").Inc(1) return false } c.serviceConfig.Logger.Info("Added new driver", zap.String("job", jobConfig.Name), zap.String("aresCluster", aresCluster)) return true } func (c *Controller) deleteDriver(driver *Driver, aresCluster string, aresClusterDrivers map[string]*Driver) { driver.Stop() aresClusterDrivers[aresCluster] = nil delete(aresClusterDrivers, aresCluster) delete(c.serviceConfig.ActiveAresClusters, aresCluster) c.serviceConfig.Logger.Info("deleted driver", zap.String("job", driver.JobName), zap.String("aresCluster", aresCluster)) } func (c *Controller) startDriver( jobConfig *rules.JobConfig, aresCluster string, aresClusterDrivers map[string]*Driver, stop bool) bool { // 0. Clone jobConfig clonedJobConfig, err := rules.CloneJobConfig(jobConfig, c.serviceConfig, aresCluster) if err != nil { c.serviceConfig.Logger.Error("Failed to copy job config", zap.String("job", jobConfig.Name), zap.String("aresCluster", aresCluster), zap.Error(err)) return false } // 1. Stop the job driver if stop { aresClusterDrivers[aresCluster].Stop() aresClusterDrivers[aresCluster] = nil } // 2. create a new driver driver, err := NewDriver(clonedJobConfig, c.serviceConfig, c.aresControllerClient, NewStreamingProcessor, c.sinkInitFunc, c.consumerInitFunc, c.decoderInitFunc) if err != nil { c.serviceConfig.Logger.Error("Failed to create driver", zap.String("job", jobConfig.Name), zap.String("cluster", aresCluster), zap.Error(err)) return false } // 3. Start the job driver go driver.Start() aresClusterDrivers[aresCluster] = driver return true } func (c *Controller) startEtcdHBService(params Params) { var err error params.ServiceConfig.Logger.Info("heartbeat config", zap.Any("heartbeat", *params.ServiceConfig.HeartbeatConfig)) params.ServiceConfig.EtcdConfig.Lock() c.etcdServices, err = connectEtcdServices(params) if err != nil { params.ServiceConfig.Logger.Panic("Failed to createEtcdServices", zap.Error(err)) } if registerHeartBeatService(params, c.etcdServices) != nil { params.ServiceConfig.Logger.Panic("Failed to registerHeartBeatService", zap.Error(err)) } params.ServiceConfig.EtcdConfig.Unlock() } // RestartEtcdHBService registers heartbeat again if etcd cluster changes are detected func (c *Controller) RestartEtcdHBService(params Params) { for { select { case <-config.EtcdCfgEvent: // TODO: unadevertises old heartbeat and closes etcd client should be added once M3 provides c.serviceConfig.Logger.Info("RestartEtcdHBService") c.startEtcdHBService(params) } } } // StartController starts periodically sync up with aresDB controller func StartController(c *Controller) { if !c.serviceConfig.ControllerConfig.Enable { c.serviceConfig.Logger.Info("aresDB Controller is disabled") return } c.serviceConfig.Logger.Info("Start Controller") ticks := time.Tick(time.Duration(c.serviceConfig.ControllerConfig.RefreshInterval) * time.Minute) go func() { for { select { case <-ticks: c.serviceConfig.Logger.Info("Start sync up with aresDB controller") c.SyncUpJobConfigs() c.serviceConfig.Logger.Info("Done sync up with aresDB controller") } } }() }