subscriber/common/job/streaming_processor.go (416 lines of code) (raw):

// Copyright (c) 2017-2018 Uber Technologies, Inc. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. package job import ( "fmt" "github.com/uber-go/tally" "github.com/uber/aresdb/client" controllerCli "github.com/uber/aresdb/controller/client" "github.com/uber/aresdb/subscriber/common/consumer" "github.com/uber/aresdb/subscriber/common/message" "github.com/uber/aresdb/subscriber/common/rules" "github.com/uber/aresdb/subscriber/common/sink" "github.com/uber/aresdb/subscriber/common/tools" "github.com/uber/aresdb/subscriber/config" "github.com/uber/aresdb/utils" "go.uber.org/zap" "strconv" "sync" "time" ) // NewConsumer is the type of function each consumer that implements Consumer should provide for initialization. type NewConsumer func(jobConfig *rules.JobConfig, serviceConfig config.ServiceConfig) (consumer.Consumer, error) // NewDecoder is the type of function each decoder that implements decoder should provide for initialization. type NewDecoder func(jobConfig *rules.JobConfig, serviceConfig config.ServiceConfig) (decoder message.Decoder, err error) // NewSink is the type of function each decoder that implements sink should provide for initialization. type NewSink func( serviceConfig config.ServiceConfig, jobConfig *rules.JobConfig, cluster string, sinkCfg config.SinkConfig, aresControllerClient controllerCli.ControllerClient) (sink.Sink, error) // StreamingProcessor defines a individual processor that connects to a Kafka high level consumer, // processes the messages based on the type of job and saves to database type StreamingProcessor struct { ID int context *ProcessorContext jobConfig *rules.JobConfig cluster string serviceConfig config.ServiceConfig scope tally.Scope aresControllerClient controllerCli.ControllerClient sink sink.Sink sinkInitFunc NewSink highLevelConsumer consumer.Consumer consumerInitFunc NewConsumer parser *message.Parser decoder message.Decoder batcher *tools.Batcher msgSizes chan int64 shutdown chan bool close chan bool errors chan ProcessorError failureHandler FailureHandler } // NewStreamingProcessor returns Processor to consume, process and save data to db. func NewStreamingProcessor(id int, jobConfig *rules.JobConfig, aresControllerClient controllerCli.ControllerClient, sinkInitFunc NewSink, consumerInitFunc NewConsumer, decoderInitFunc NewDecoder, errors chan ProcessorError, msgSizes chan int64, serviceConfig config.ServiceConfig) (Processor, error) { var ( err error db sink.Sink hlConsumer consumer.Consumer decoder message.Decoder ) defer func() { if err == nil { return } if db != nil { db.Shutdown() } if hlConsumer != nil { hlConsumer.Close() } if decoder != nil { decoder = nil } }() cluster := jobConfig.AresTableConfig.Cluster // Initialize downstream DB db, err = initSink(jobConfig, serviceConfig, aresControllerClient, sinkInitFunc) if err != nil { return nil, utils.StackError(err, fmt.Sprintf("Failed to initialize database connection for job: %s, cluster: %s", jobConfig.Name, cluster)) } // initialize failure handler failureHandler := initFailureHandler(serviceConfig, jobConfig, db) // Initialize Kafka consumer hlConsumer, err = consumerInitFunc(jobConfig, serviceConfig) if err != nil { return nil, utils.StackError(err, fmt.Sprintf( "Unable to initialize Kafka consumer for job: %s, cluster: %s", jobConfig.Name, cluster)) } // Initialize the decoder based on topic decoder, err = decoderInitFunc(jobConfig, serviceConfig) if err != nil { return nil, utils.StackError(err, fmt.Sprintf("Unable to initialize Kafka message decoder for job: %s, cluster: %s", jobConfig.Name, cluster)) } // Initialize message parser parser := message.NewParser(jobConfig, serviceConfig) processor := &StreamingProcessor{ ID: id, jobConfig: jobConfig, cluster: cluster, serviceConfig: serviceConfig, scope: serviceConfig.Scope.Tagged(map[string]string{ "job": jobConfig.Name, "aresCluster": jobConfig.AresTableConfig.Cluster, }), aresControllerClient: aresControllerClient, sink: db, sinkInitFunc: sinkInitFunc, failureHandler: failureHandler, highLevelConsumer: hlConsumer, consumerInitFunc: consumerInitFunc, msgSizes: msgSizes, parser: parser, decoder: decoder, shutdown: make(chan bool), close: make(chan bool), errors: errors, context: &ProcessorContext{ StartTime: time.Now(), Errors: processorErrors{ errors: make([]ProcessorError, jobConfig.StreamingConfig.ErrorThreshold*10), }, }, } processor.initBatcher() return processor, nil } func initFailureHandler(serviceConfig config.ServiceConfig, jobConfig *rules.JobConfig, db sink.Sink) FailureHandler { if jobConfig.StreamingConfig.FailureHandler.Type == retryHandler { return NewRetryFailureHandler( jobConfig.StreamingConfig.FailureHandler.Config, serviceConfig, db, jobConfig.Name) } return nil } // initDatabase will initialize the database for writing ingest data func initSink( jobConfig *rules.JobConfig, serviceConfig config.ServiceConfig, aresControllerClient controllerCli.ControllerClient, sinkInitFunc NewSink) (sink.Sink, error) { cluster := jobConfig.AresTableConfig.Cluster serviceConfig.Logger.Info("Initialize database", zap.String("job", jobConfig.Name), zap.String("cluster", cluster)) var aresConfig config.SinkConfig var ok bool if aresConfig, ok = serviceConfig.ActiveAresClusters[cluster]; !ok { return nil, fmt.Errorf("Failed to get ares config for job: %s, cluster: %s", jobConfig.Name, cluster) } return sinkInitFunc(serviceConfig, jobConfig, cluster, aresConfig, aresControllerClient) } // GetID will return ID of this processor func (s *StreamingProcessor) GetID() int { return s.ID } // GetContext will return context of this processor func (s *StreamingProcessor) GetContext() *ProcessorContext { return s.context } // Stop will stop the processor func (s *StreamingProcessor) Stop() { s.context.Lock() if s.context.Restarting { s.context.Restarting = false } else if !s.context.Shutdown { close(s.shutdown) } s.context.Unlock() } // Restart will stop the processor and start the process again in the case failure detected func (s *StreamingProcessor) Restart() { s.context.Lock() if s.context.Restarting { s.serviceConfig.Logger.Info("Restarting: processor already in restarting", zap.String("job", s.jobConfig.Name), zap.String("cluster", s.cluster)) s.context.Unlock() return } if s.context.Stopped || s.context.Shutdown { s.serviceConfig.Logger.Info("Restarting: processor already stopped", zap.String("job", s.jobConfig.Name), zap.String("cluster", s.cluster)) s.context.Unlock() return } s.context.Restarting = true s.context.RestartCount++ s.context.RestartTime = time.Now().UnixNano() / int64(time.Millisecond) s.context.Unlock() s.serviceConfig.Logger.Info( "Restarting processor (Stop original processor)", zap.Int("ID", s.ID), zap.String("job", s.jobConfig.Name), zap.String("cluster", s.cluster)) // not calling Stop() due to different flag setting close(s.shutdown) // wating for the original routine to stop for { time.Sleep(time.Millisecond) if s.context.Stopped { break } } // wating for some time to avoid keep restarting in short of period, or quit if stop is called during restart timeTick := time.NewTicker(time.Second) timer := time.NewTimer(time.Second * time.Duration(s.jobConfig.StreamingConfig.RestartInterval)) defer func() { s.context.Lock() s.context.Restarting = false timeTick.Stop() timer.Stop() s.context.Unlock() }() loop: for { select { case <-timeTick.C: if !s.context.Restarting { s.serviceConfig.Logger.Info("Restarting interrupted, give up", zap.String("job", s.jobConfig.Name), zap.String("cluster", s.cluster)) return } case <-timer.C: break loop } } err := s.reInitialize() if err == nil { s.serviceConfig.Logger.Info( "Restarting processor(Re-initialize)", zap.Int("ID", s.ID), zap.String("job", s.jobConfig.Name), zap.String("cluster", s.cluster)) // restart go s.Run() } else { // quit restarting if failed to re-initialized database or consumer s.serviceConfig.Logger.Error( "Failed to restart processor", zap.Int("ID", s.ID), zap.String("job", s.jobConfig.Name), zap.String("cluster", s.cluster), zap.Error(err)) } } func (s *StreamingProcessor) reInitialize() error { // maybe we can try to re-initialize if anything failed s.serviceConfig.Logger.Info("Restarting processor(Re-initialize)", zap.Int("ID", s.ID), zap.String("job", s.jobConfig.Name), zap.String("cluster", s.cluster)) db, err := initSink(s.jobConfig, s.serviceConfig, s.aresControllerClient, s.sinkInitFunc) if err != nil { err = utils.StackError(err, "Unable to initialize Database") return err } // Initialize Kafka consumer hlConsumer, err := s.consumerInitFunc(s.jobConfig, s.serviceConfig) if err != nil { err = utils.StackError(err, "Unable to initialize Kafka consumer") db.Shutdown() return err } s.sink = db s.highLevelConsumer = hlConsumer s.failureHandler = initFailureHandler(s.serviceConfig, s.jobConfig, s.sink) s.initBatcher() s.shutdown = make(chan bool) return nil } // Run will start the Processor that reads from high level kafka consumer, // decodes the message and add the row to batcher for saving to ares. func (s *StreamingProcessor) Run() { s.serviceConfig.Logger.Info("Starting Job", zap.String("job", s.jobConfig.Name), zap.String("cluster", s.cluster), zap.Any("config", s.jobConfig)) // reset back the running flag s.context.Lock() s.context.Shutdown = false s.context.Stopped = false s.context.Restarting = false s.context.Unlock() defer func() { s.highLevelConsumer.Close() s.batcher.Close() s.sink.Shutdown() s.context.Lock() s.context.Stopped = true s.context.Unlock() }() for { select { case msg := <-s.highLevelConsumer.Messages(): msgInSubTS := time.Now() if msg != nil { // Update message count in context s.context.Lock() s.context.TotalMessages++ s.context.Unlock() s.scope.Counter("message.totalCount").Inc(1) // log message size and report for throttling msgLength := int64(len(msg.Value())) s.scope.Gauge("message.size").Update(float64(msgLength)) s.msgSizes <- msgLength // decode message and add to batcher for parse and save message, err := s.decodeMessage(msg) message.MsgInSubTS = msgInSubTS if err == nil { s.batcher.Add(message, time.Now()) s.reportMessageAge(message) } } else { s.scope.Counter("errors.kafka.nilMessages").Inc(1) } case err := <-s.highLevelConsumer.Errors(): if err != nil { s.serviceConfig.Logger.Error("Error reading from consumer", zap.String("job", s.jobConfig.Name), zap.String("cluster", s.cluster), zap.String("name", message.GetFuncName()), zap.Error(err)) s.scope.Counter("errors.kafka.consumer").Inc(1) } else { s.scope.Counter("errors.kafka.nilErrors").Inc(1) } case <-s.highLevelConsumer.Closed(): s.serviceConfig.Logger.Info("Consumer closed. Shutting down processor", zap.String("job", s.jobConfig.Name), zap.String("cluster", s.cluster)) return case <-s.shutdown: s.serviceConfig.Logger.Info("Processor shutdown requested. Shutting down processor", zap.String("job", s.jobConfig.Name), zap.String("cluster", s.cluster)) s.context.Lock() s.context.Shutdown = true s.context.Unlock() return } } } // initBatcher will initialize the batcher with 1 worker per processor, so to // maintain the order of offset commits func (s *StreamingProcessor) initBatcher() { s.serviceConfig.Logger.Info("Initialize batcher", zap.String("job", s.jobConfig.Name), zap.String("cluster", s.cluster)) batcher := tools.NewBatcher(s.jobConfig.StreamingConfig.BatchSize, time.Duration(s.jobConfig.StreamingConfig.MaxBatchDelayMS)*time.Millisecond, time.Now) batcher.StartWorker(s.saveToDB) s.batcher = batcher } // decodeMessage will decode the given Kafka message and return Message, which defines // actual raw Kafka message, decoded message and timestamp of the message func (s *StreamingProcessor) decodeMessage(msg consumer.Message) (*message.Message, error) { message, err := s.decoder.DecodeMsg(msg) if err != nil { s.serviceConfig.Logger.Error("Unable to decode message", zap.String("job", s.jobConfig.Name), zap.String("cluster", s.cluster), zap.Error(err)) s.scope.Counter("errors.messageDecode").Inc(1) return nil, err } return message, nil } // saveToDestination will parse given decoded message based on transformations in JobConfig // and save it to configured destination func (s *StreamingProcessor) saveToDestination(batch []interface{}, destination sink.Destination) { s.scope.Gauge("batcherBatchSize").Update(float64(len(batch))) rows := []client.Row{} for _, b := range batch { msg := b.(*message.Message).DecodedMessage[message.MsgPrefix].(map[string]interface{}) if s.parser.IsMessageValid(msg, destination) != nil { s.serviceConfig.Logger.Debug("Invalid message", zap.Any("msg", msg)) continue } row, err := s.parser.ParseMessage(msg, destination) if err == nil && s.parser.CheckPrimaryKeys(destination, row) == nil && s.parser.CheckTimeColumnExistence( s.jobConfig.AresTableConfig.Table, s.jobConfig.GetColumnDict(), destination, row) == nil { rows = append(rows, row) } else { s.context.Lock() s.context.FailedMessages++ s.context.LastUpdated = time.Now() s.context.Unlock() } } size := len(batch) if size > 0 { s.scope.Timer("lag.ingestion").Record(time.Now().Sub(batch[size-1].(*message.Message).MsgInSubTS)) s.writeRow(rows, destination) } } func (s *StreamingProcessor) writeRow(rows []client.Row, destination sink.Destination) { err := s.sink.Save(destination, rows) if err != nil { s.serviceConfig.Logger.Error( "Unable to save rows to database", zap.String("job", s.jobConfig.Name), zap.String("cluster", s.cluster), zap.String("name", message.GetFuncName()), zap.Error(err)) if s.failureHandler != nil { err = s.failureHandler.HandleFailure(destination, rows) } if err != nil { pe := ProcessorError{ ID: s.ID, Timestamp: time.Now().UnixNano() / int64(time.Millisecond), Error: err, } s.context.Lock() s.context.Errors.errorIdx = (s.context.Errors.errorIdx + 1) % len(s.context.Errors.errors) s.context.Errors.errors[s.context.Errors.errorIdx] = pe s.context.FailedMessages += int64(len(rows)) s.context.Unlock() s.errors <- pe } } s.context.Lock() s.context.LastUpdated = time.Now() s.context.Unlock() } // saveToDB will parse and save given batches func (s *StreamingProcessor) saveToDB(batches chan []interface{}, wg *sync.WaitGroup) { for batch := range batches { s.saveToDestination(batch, s.parser.Destination) for _, batchObj := range batch { msg := batchObj.(*message.Message) err := s.highLevelConsumer.CommitUpTo(msg.RawMessage) if err == nil { s.scope.Counter("message.commited").Inc(1) } } } wg.Done() } // reportMessageAge will report the message age for the message func (s *StreamingProcessor) reportMessageAge(msg *message.Message) { if !msg.MsgMetaDataTS.IsZero() { s.scope.Tagged(map[string]string{ "partition": strconv.Itoa(int(msg.RawMessage.Partition())), }).Timer("message.age").Record(time.Now().Sub(msg.MsgMetaDataTS)) } }