subscriber/common/job/driver.go (388 lines of code) (raw):

// Copyright (c) 2017-2018 Uber Technologies, Inc. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. package job import ( "bytes" "encoding/json" "fmt" "net/http" "strconv" "sync" "time" controllerCli "github.com/uber/aresdb/controller/client" "github.com/uber-go/tally" "github.com/uber/aresdb/subscriber/common/rules" "github.com/uber/aresdb/subscriber/config" "go.uber.org/zap" ) // Drivers contains information about job, ares cluster and its driver type Drivers map[string]map[string]*Driver // Driver will initialize and start the Processor's based on the JobConfig provided type Driver struct { sync.RWMutex Topic string StartTime time.Time Shutdown bool JobName string AresCluster string TotalProcessors int RunningProcessors int StoppedProcessors int FailedProcessors int RestartingProcessors int ProcessorContext map[string]*ProcessorContext // driver related variable waitGroup *sync.WaitGroup shutdown chan bool jobConfig *rules.JobConfig serviceConfig config.ServiceConfig scope tally.Scope errors chan ProcessorError errorThreshold int statusCheckInterval int // processors related variables processorInitFunc NewProcessor processorCounter uint processors []Processor processorMsgCount map[int]int64 processorMsgSizes chan int64 aresControllerClient controllerCli.ControllerClient sinkInitFunc NewSink consumerInitFunc NewConsumer decoderInitFunc NewDecoder } // NewProcessor is the type of function each processor that implements Processor should provide for initialization // This function implementation should always return a new instance of the processor type NewProcessor func(id int, jobConfig *rules.JobConfig, aresControllerClient controllerCli.ControllerClient, sinkInitFunc NewSink, consumerInitFunc NewConsumer, decoderInitFunc NewDecoder, errors chan ProcessorError, msgSizes chan int64, serviceConfig config.ServiceConfig) (Processor, error) // NewDrivers return Drivers func NewDrivers(params Params, aresControllerClient controllerCli.ControllerClient) (Drivers, error) { drivers := make(Drivers) // iterate local job configs for jobName, jobAresConfig := range params.JobConfigs { // iterate all active ares clusters aresClusterDriverTable := make(map[string]*Driver) for aresCluster, jobConfig := range jobAresConfig { driver, err := NewDriver(jobConfig, params.ServiceConfig, aresControllerClient, NewStreamingProcessor, params.SinkInitFunc, params.ConsumerInitFunc, params.DecoderInitFunc) if err != nil { params.ServiceConfig.Logger.Error("Failed to create driver", zap.String("job", jobName), zap.String("cluster", jobConfig.AresTableConfig.Cluster), zap.Error(err)) return nil, fmt.Errorf("Failed to create Drivers, reason: job %s cluster %s failed to create driver", jobName, aresCluster) } go driver.Start() aresClusterDriverTable[aresCluster] = driver } drivers[jobName] = aresClusterDriverTable } return drivers, nil } // NewDriver will return a new Driver instance to start a ingest job func NewDriver( jobConfig *rules.JobConfig, serviceConfig config.ServiceConfig, aresControllerClient controllerCli.ControllerClient, processorInitFunc NewProcessor, sinkInitFunc NewSink, consumerInitFunc NewConsumer, decoderInitFunc NewDecoder) (*Driver, error) { return &Driver{ Topic: jobConfig.StreamingConfig.Topic, JobName: jobConfig.Name, AresCluster: jobConfig.AresTableConfig.Cluster, StartTime: time.Now(), Shutdown: false, TotalProcessors: jobConfig.StreamingConfig.ProcessorCount, StoppedProcessors: 0, RestartingProcessors: 0, ProcessorContext: make(map[string]*ProcessorContext), waitGroup: &sync.WaitGroup{}, shutdown: make(chan bool), jobConfig: jobConfig, serviceConfig: serviceConfig, scope: serviceConfig.Scope.Tagged(map[string]string{ "job": jobConfig.Name, "aresCluster": jobConfig.AresTableConfig.Cluster, }), errors: make(chan ProcessorError), errorThreshold: jobConfig.StreamingConfig.ErrorThreshold, statusCheckInterval: jobConfig.StreamingConfig.StatusCheckInterval, processorInitFunc: processorInitFunc, processorCounter: uint(0), processors: []Processor{}, processorMsgCount: make(map[int]int64), processorMsgSizes: make(chan int64), aresControllerClient: aresControllerClient, sinkInitFunc: sinkInitFunc, consumerInitFunc: consumerInitFunc, decoderInitFunc: decoderInitFunc, }, nil } // Start will start the Driver, which starts Processor's to read from Kafka consumer, process // and save to database func (d *Driver) Start() { go d.monitorStatus(time.NewTicker(time.Duration(d.statusCheckInterval) * time.Second)) go d.monitorErrors() go d.limitRate() d.addProcessors(d.jobConfig.StreamingConfig.ProcessorCount) } func (d *Driver) addProcessors(count int) { for i := 0; i < count; i++ { err := d.AddProcessor() if err != nil { d.serviceConfig.Scope.Counter("errors.driver.addprocessor").Inc(1) } // adding too many consumers at once can make some consumers stuck with no messages time.Sleep(500 * time.Millisecond) } } // AddProcessor will add a new processor to driver func (d *Driver) AddProcessor() (err error) { d.Lock() defer func() { if err != nil { d.processorCounter-- } d.Unlock() }() d.serviceConfig.Logger.Info("Adding a new processor", zap.String("job", d.JobName), zap.String("cluster", d.AresCluster)) // get a new ID for processor, we start at 1 d.processorCounter++ ID := int(d.processorCounter) processor, err := d.processorInitFunc(ID, d.jobConfig, d.aresControllerClient, d.sinkInitFunc, d.consumerInitFunc, d.decoderInitFunc, d.errors, d.processorMsgSizes, d.serviceConfig) if err != nil { d.serviceConfig.Logger.Error("Failed to initialize Processor", zap.String("job", d.JobName), zap.String("cluster", d.AresCluster), zap.Error(err)) return } // add processor d.processors = append(d.processors, processor) d.ProcessorContext[strconv.Itoa(ID)] = processor.GetContext() // start processor go processor.Run() // Update context d.TotalProcessors++ d.RunningProcessors++ return } // RemoveProcessor will remove a processor from driver. func (d *Driver) RemoveProcessor(ID int) bool { d.Lock() defer d.Unlock() return d.removeProcessor(ID) } func (d *Driver) removeProcessor(ID int) bool { // if no processors running, nothing to remove if d.RunningProcessors <= 0 { return false } if ID <= 0 { // pick last running processor ID = len(d.processors) - 1 for d.processors[ID] == nil || d.processors[ID].GetContext().Stopped { ID-- } } else { ID = ID - 1 } d.serviceConfig.Logger.Info("Removing a processor", zap.Int("ID", ID+1), zap.String("job", d.JobName), zap.String("cluster", d.AresCluster)) // stop the processor if !d.processors[ID].GetContext().Stopped { d.processors[ID].Stop() } else { d.serviceConfig.Logger.Warn("Processor already stopped", zap.Int("ID", ID), zap.String("job", d.JobName), zap.String("cluster", d.AresCluster)) return false } // update context pc := d.ProcessorContext[strconv.Itoa(ID+1)] pc.Lock() pc.Stopped = true pc.Unlock() d.StoppedProcessors++ d.TotalProcessors-- d.RunningProcessors-- // remove the processor d.processors[ID] = nil // remove the processor id from message count map delete(d.processorMsgCount, ID+1) return true } // restartProcessor will restart a processor from driver. func (d *Driver) restartProcessor(ID int) { d.Lock() defer d.Unlock() if d.processors[ID-1] != nil { go d.processors[ID-1].Restart() } } // MarshalJSON marshal driver into json func (d *Driver) MarshalJSON() ([]byte, error) { d.RLock() defer d.RUnlock() buf := new(bytes.Buffer) startTimeJSON, err := d.StartTime.MarshalJSON() if err != nil { return nil, err } processorContextJSON, err := json.Marshal(d.ProcessorContext) if err != nil { return nil, err } buf.WriteString(fmt.Sprintf(`{"environment":"%s","topic":"%s","startTime":%s,`+ `"jobName":"%s","totalProcessors":%d,"runningProcessors":%d,`+ `"stoppedProcessors":%d,"failedProcessors":%d,"restartingProcessors":%d,"processorContext":%s}`, d.serviceConfig.Environment.Deployment, d.Topic, string(startTimeJSON), d.JobName, d.TotalProcessors, d.RunningProcessors, d.StoppedProcessors, d.FailedProcessors, d.RestartingProcessors, string(processorContextJSON), )) return buf.Bytes(), nil } // WriteContext writes context func (d *Driver) WriteContext(w http.ResponseWriter) { ctxStr, err := d.MarshalJSON() if err != nil { d.serviceConfig.Logger.Error("Unable to marshal context", zap.Error(err)) } w.Write(ctxStr) } // limitRate will rate limit the amount of data read from Kafka, so to // not saturate the network bandwidth on mesos compute hosts func (d *Driver) limitRate() { d.waitGroup.Add(1) defer d.waitGroup.Done() // only one goroutine read/write this value var cumulativeSizeInBytes int64 var mask int64 = 1<<20 - 1 maxTokens := d.jobConfig.StreamingConfig.MegaBytePerSec tokens := make(chan bool, maxTokens) ticks := time.NewTicker(time.Second) d.waitGroup.Add(1) go func() { defer d.waitGroup.Done() defer ticks.Stop() // drain tokens every second to allow more for { select { case <-ticks.C: numExistingTokens := len(tokens) for i := 0; i < numExistingTokens; i++ { <-tokens } case <-d.shutdown: return } } }() for { select { case msgSize := <-d.processorMsgSizes: cumulativeSizeInBytes += msgSize tokensToIssue := int(cumulativeSizeInBytes >> 20) cumulativeSizeInBytes = cumulativeSizeInBytes & mask // try produce tokens for i := 0; i < tokensToIssue; i++ { tokens <- true } case <-d.shutdown: d.serviceConfig.Logger.Info("Shutdown driver limitRate:", zap.String("job", d.JobName), zap.String("cluster", d.AresCluster)) return } } } // monitorStatus will monitor the status of all Processor's and report them to graphite func (d *Driver) monitorStatus(ticker *time.Ticker) { d.waitGroup.Add(1) defer d.waitGroup.Done() defer ticker.Stop() for { select { case <-ticker.C: failedProcessors := 0 stoppedProcessors := 0 restartingProcessors := 0 // first add processors in case processor failed to be initialized at the begining d.addProcessors(d.jobConfig.StreamingConfig.ProcessorCount - len(d.ProcessorContext)) d.Lock() for i, p := range d.ProcessorContext { p.RLock() // if shutdown not intentional, but processor is dead, trouble! if !p.Shutdown && p.Stopped { failedProcessors++ } if p.Shutdown && p.Stopped { stoppedProcessors++ } if p.Restarting { restartingProcessors++ } // Check if messages are processed by all processors id, _ := strconv.Atoi(i) // Update new message count for comparing with next check d.processorMsgCount[id] = p.TotalMessages p.RUnlock() } d.scope.Gauge("processors.failed").Update(float64(failedProcessors)) total := len(d.ProcessorContext) - stoppedProcessors + restartingProcessors healthy := total - failedProcessors d.RunningProcessors = healthy - restartingProcessors d.TotalProcessors = total d.RestartingProcessors = restartingProcessors d.serviceConfig.Logger.Debug("Processors:", zap.Int("Configured", d.jobConfig.StreamingConfig.ProcessorCount), zap.String("job", d.JobName), zap.String("cluster", d.AresCluster), zap.Int("Total", total), zap.Int("Running", healthy)) if diff := healthy - d.jobConfig.StreamingConfig.ProcessorCount; diff < 0 { d.scope.Gauge("numProcessors").Update(float64(-diff)) } else { d.scope.Gauge("numProcessors").Update(0) } d.Unlock() case <-d.shutdown: d.serviceConfig.Logger.Info("Shutdown driver monitorStatus:", zap.String("job", d.JobName), zap.String("cluster", d.AresCluster)) return } } } // monitorErrors will monitor errors generated by each Processor and kill them if exceeds // configured threshold number of errors per processor func (d *Driver) monitorErrors() { d.waitGroup.Add(1) defer d.waitGroup.Done() processorErrorCount := make(map[int]int) for { select { case err := <-d.errors: if d.processors[err.ID-1] != nil { if err.Timestamp > d.processors[err.ID-1].GetContext().RestartTime { // skip errors which are before restart if val, ok := processorErrorCount[err.ID]; !ok { processorErrorCount[err.ID] = 1 } else { processorErrorCount[err.ID] = val + 1 } } } else { d.serviceConfig.Logger.Warn("No processor exist for", zap.Int("ID", err.ID), zap.String("job", d.JobName), zap.String("cluster", d.AresCluster)) } if processorErrorCount[err.ID] == d.errorThreshold { d.serviceConfig.Logger.Info("Restarting processor for reaching error threshold", zap.Int("ID", err.ID), zap.String("job", d.JobName), zap.String("cluster", d.AresCluster)) if d.jobConfig.StreamingConfig.RestartOnFailure { // reset processor error count processorErrorCount[err.ID] = 0 d.restartProcessor(err.ID) time.Sleep(time.Millisecond * 10) } else { d.RemoveProcessor(err.ID) } } case <-d.shutdown: d.serviceConfig.Logger.Info("Shutdown driver monitorErrors:", zap.String("job", d.JobName), zap.String("cluster", d.AresCluster)) return } } } // GetErrors returns errors func (d *Driver) GetErrors() chan ProcessorError { return d.errors } // Stop will shutdown driver and its processors func (d *Driver) Stop() { defer func() { d.Shutdown = true d.Unlock() }() d.Lock() // Shutdown all processors for _, processor := range d.processors { if processor == nil { continue } d.removeProcessor(processor.GetID()) } if !d.Shutdown { close(d.shutdown) } d.waitGroup.Wait() }