controller/tasks/etcd/ingestion_assignment.go (353 lines of code) (raw):

// Copyright (c) 2017-2018 Uber Technologies, Inc. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. package etcd import ( "math/rand" "os" "reflect" "time" "github.com/uber/aresdb/controller/models" mutators "github.com/uber/aresdb/controller/mutators/common" "github.com/uber/aresdb/controller/tasks/common" "github.com/m3db/m3/src/cluster/services" "github.com/uber-go/tally" metaCom "github.com/uber/aresdb/metastore/common" "github.com/uber/aresdb/utils" "github.com/uber/aresdb/utils/consistenthasing" "go.uber.org/zap" ) const ( assignmentChangeMetricName = "ingestion_assignment_changed" assignmentErrorMetricName = "ingestion_assignment_error" assignmentSuccessMetricName = "ingestion_assignment_success" ingestionAssignmentConfigKey = "ingestionAssignmentTask" taskTagValue = "ingestionAssignmentTask" ) type ingestionAssignmentTaskConfig struct { IntervalInSeconds int `yaml:"intervalInSeconds"` } // ingestionAssignmentTask calculates ingestion jobs assignment to subscriber instances // given current state of the cluster type ingestionAssignmentTask struct { zone string environment string intervalSeconds int logger *zap.SugaredLogger scope tally.Scope // closing stopChan will: // stop ongoing leader election, which will clean up leader election states // stop ongoing task assignment calculation loop stopChan chan struct{} namespaceMutator mutators.NamespaceMutator jobMutator mutators.JobMutator schemaMutator mutators.TableSchemaMutator subscriberMutator mutators.SubscriberMutator assignmentsMutator mutators.IngestionAssignmentMutator etcdServices services.Services leaderElection LeaderElector configHashes map[string]configHash } type jobSubscriberState struct { namespace string subscribers []models.Subscriber jobs []models.JobConfig } type configHash struct { jobsHash string schemaHash string subscriberHash string } // NewIngestionAssignmentTask creates a new instance of ingestionAssignmentTask func NewIngestionAssignmentTask(p common.IngestionAssignmentTaskParams) common.Task { var iaconfig ingestionAssignmentTaskConfig logger := p.Logger.With("task", taskTagValue) scope := p.Scope.Tagged(map[string]string{"task": taskTagValue}) if err := p.ConfigProvider.Get(ingestionAssignmentConfigKey).Populate(&iaconfig); err != nil { logger.Fatal("failed to load config") } serviceID := services.NewServiceID(). SetEnvironment(p.EtcdClient.Environment). SetZone(p.EtcdClient.Zone). SetName(p.EtcdClient.ServiceName) leaderService, err := p.EtcdClient.Services.LeaderService(serviceID, nil) if err != nil { logger.Fatal("failed to create leader service") } task := &ingestionAssignmentTask{ intervalSeconds: iaconfig.IntervalInSeconds, logger: logger, scope: scope, stopChan: make(chan struct{}, 1), zone: p.EtcdClient.Zone, environment: p.EtcdClient.Environment, etcdServices: p.EtcdClient.Services, leaderElection: NewLeaderElector(leaderService), namespaceMutator: p.NamespaceMutator, jobMutator: p.JobMutator, schemaMutator: p.SchemaMutator, assignmentsMutator: p.AssignmentsMutator, subscriberMutator: p.SubscriberMutator, configHashes: make(map[string]configHash), } return task } // Run starts the ingestionAssignmentTask func (ia *ingestionAssignmentTask) Run() { hostName, _ := os.Hostname() // wait random interval to avoid herd effect electing for leader on cluster reboot waitSeconds := rand.Intn(5) time.Sleep(time.Duration(waitSeconds) * time.Second) ia.logger.With( "host", hostName, "waitedSeconds", waitSeconds, ).Info("start ingestion assignment task after waiting") if err := ia.leaderElection.Start(); err != nil { ia.logger.With("host", hostName, "error", err.Error()).Error("failed to start leader election") ia.scope.Counter("task_failed").Inc(1) return } defer func() { err := ia.leaderElection.Close() if err != nil { ia.logger.Error(err) } else { ia.logger.With("host", hostName).Infof("stopped leader election") } }() ia.logger.With("host", hostName).Infof("entering ingestion assignment calculation loop") ia.startIngestionAssignment(hostName) ia.logger.With("host", hostName).Infof("exited ingestion assignment calculation loop") } // Done stops the task func (ia *ingestionAssignmentTask) Done() { ia.logger.Info("killing ingestion assignment task") close(ia.stopChan) } func (ia *ingestionAssignmentTask) startIngestionAssignment(hostName string) { for { select { // waiting for new election status change case <-ia.leaderElection.C(): if !ia.isLeader() { continue } case <-ia.stopChan: return } ia.logger.With("host", hostName).Infof("elected as leader") tickerChan := time.NewTicker(time.Duration(ia.intervalSeconds) * time.Second).C loop: for { select { case <-ia.leaderElection.C(): if !ia.isLeader() { ia.logger.With("host", hostName).Infof("host is no longer the leader") break loop } case <-tickerChan: if !ia.isLeader() { ia.logger.With("host", hostName).Infof("host is no longer the leader") break loop } ia.tryRecalculateAllNamespaces() case <-ia.stopChan: return } } } } func (ia *ingestionAssignmentTask) isLeader() bool { return ia.leaderElection.Status() == Leader } func (ia *ingestionAssignmentTask) checkConfigHashes(namespace string) (hashes configHash, err error) { hashes.jobsHash, err = ia.jobMutator.GetHash(namespace) if err != nil { return } hashes.schemaHash, err = ia.schemaMutator.GetHash(namespace) if err != nil { return } hashes.subscriberHash, err = ia.subscriberMutator.GetHash(namespace) if err != nil { return } return } func (ia *ingestionAssignmentTask) readCurrentState(namespace string) (jobSubscriberState, error) { state := jobSubscriberState{ namespace: namespace, } subscriberServiceID := services.NewServiceID(). SetName(utils.SubscriberServiceName(namespace)). SetZone(ia.zone). SetEnvironment(ia.environment) hbSvc, err := ia.etcdServices.HeartbeatService(subscriberServiceID) if err != nil { return state, err } subscriberInstances, err := hbSvc.Get() if err != nil { return state, err } for _, subscriberInstance := range subscriberInstances { state.subscribers = append(state.subscribers, models.Subscriber{ Name: subscriberInstance, }) } datanodeServiceID := services.NewServiceID(). SetName(utils.DataNodeServiceName(namespace)). SetZone(ia.zone). SetEnvironment(ia.environment) placementService, err := ia.etcdServices.PlacementService(datanodeServiceID, nil) if err != nil { return state, err } placement, err := placementService.Placement() if err != nil { return state, err } numShards := placement.NumShards() state.jobs, err = ia.jobMutator.GetJobs(namespace) if err != nil { return state, err } for i, job := range state.jobs { var table *metaCom.Table table, err = ia.schemaMutator.GetTable(namespace, job.AresTableConfig.Name) if err != nil { return state, err } job.AresTableConfig.Table = table job.NumShards = numShards state.jobs[i] = job } if len(state.subscribers) == 0 { return state, common.ErrNotEnoughSubscribers } return state, nil } func (ia *ingestionAssignmentTask) recalculateForNamespace(ns string) { ia.logger.With( "namespace", ns, ).Info("recalculating assignment for namespace") state, err := ia.readCurrentState(ns) if err != nil { ia.logger.Error(err) ia.scope.Counter(assignmentErrorMetricName).Inc(1) return } changes, errs := ia.processIngestionAssignment(state) if errs > 0 { ia.scope.Counter(assignmentErrorMetricName).Inc(int64(errs)) } else { ia.scope.Counter(assignmentChangeMetricName).Inc(int64(changes)) } } func (ia *ingestionAssignmentTask) tryRecalculateAllNamespaces() (errs int) { ia.logger.Info("try recalculating ingestion assignments") namespaces, err := ia.namespaceMutator.ListNamespaces() if err != nil { ia.reportError(err, &errs) return } for _, ns := range namespaces { hashes, err := ia.checkConfigHashes(ns) if err != nil { ia.reportError(err, &errs) return } if existingHashes, exist := ia.configHashes[ns]; !exist || hashes != existingHashes { ia.recalculateForNamespace(ns) ia.configHashes[ns] = hashes } ia.scope.Counter(assignmentSuccessMetricName).Inc(1) } ia.logger.Info("ingestion assignments calculation finished") return } func (ia *ingestionAssignmentTask) processIngestionAssignment(state jobSubscriberState) (changes, errs int) { // build consistent hash ring where a ring node is a subscriber // and resource key is the kafka topic name. // this will guarantee minimum change for a topic's ingestion assignment // when subscribers join/leave the cluster ring := consistenthasing.NewRing() for _, subscriber := range state.subscribers { err := ring.AddNode(subscriber.Name) if err != nil { ia.reportError(err, &errs) return } } jobAssignments := map[string][]models.JobConfig{} // TODO: take subscriber instance capacity into consideration when assigning jobs for _, job := range state.jobs { processorsNeeded := job.StreamingConfig.ProcessorCount if processorsNeeded <= 0 { continue } processorsPerSubscriber := processorsNeeded / len(state.subscribers) if processorsPerSubscriber < 1 { processorsPerSubscriber = 1 } // calculate starting node of task assignment base on kafka topic name startingIndex, _ := ring.Get(job.StreamingConfig.Topic) for i := 0; processorsNeeded > 0; i++ { processorsToAssign := processorsPerSubscriber if processorsNeeded < processorsToAssign { processorsToAssign = processorsNeeded } j := job j.StreamingConfig.ProcessorCount = processorsToAssign index := (startingIndex + i) % len(state.subscribers) subscriberName := ring.Nodes[index].ID jobAssignments[subscriberName] = append(jobAssignments[subscriberName], j) processorsNeeded -= processorsToAssign } } existingAssignments, err := ia.assignmentsMutator.GetIngestionAssignments(state.namespace) if err != nil { ia.reportError(err, &errs) return } existingAssignmentsMap := map[string]*models.IngestionAssignment{} for _, assignment := range existingAssignments { subscriber := assignment.Subscriber existingAssignmentsMap[subscriber] = &assignment } for _, subscriberObj := range state.subscribers { subscriber := subscriberObj.Name jobAssignment, newExists := jobAssignments[subscriber] existingAssignment, oldExist := existingAssignmentsMap[subscriber] if newExists { if oldExist { if !reflect.DeepEqual(existingAssignment.Jobs, jobAssignment) { // update existing assignment err = ia.assignmentsMutator.UpdateIngestionAssignment(state.namespace, models.IngestionAssignment{ Subscriber: subscriber, Jobs: jobAssignment, }) if err != nil { ia.reportError(err, &errs) } else { changes++ } } // mark not deleted existingAssignmentsMap[subscriber] = nil } else { // new assignment err = ia.assignmentsMutator.AddIngestionAssignment(state.namespace, models.IngestionAssignment{ Subscriber: subscriber, Jobs: jobAssignment, }) if err != nil { ia.reportError(err, &errs) } else { changes++ } } } else if !oldExist { // add dummy assignment err = ia.assignmentsMutator.AddIngestionAssignment(state.namespace, models.IngestionAssignment{ Subscriber: subscriber, Jobs: []models.JobConfig{}, }) } } for k, v := range existingAssignmentsMap { if v != nil { ia.logger.With( "assignment", k, ).Info("deleting assignment") err = ia.assignmentsMutator.DeleteIngestionAssignment(state.namespace, k) if err != nil { ia.reportError(err, &errs) } changes++ } } return } func (ia *ingestionAssignmentTask) reportError(err error, errCount *int) { ia.logger.Error(err) *errCount++ }