subscriber/common/consumer/kafka/kafka.go (284 lines of code) (raw):

// Copyright (c) 2017-2018 Uber Technologies, Inc. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. package kafka import ( "context" "fmt" "github.com/Shopify/sarama" "github.com/uber-go/tally" "github.com/uber/aresdb/subscriber/common/consumer" "github.com/uber/aresdb/subscriber/common/rules" "github.com/uber/aresdb/subscriber/config" "github.com/uber/aresdb/utils" "go.uber.org/zap" "strconv" "strings" "sync" "time" ) // KafkaConsumer implements Consumer interface type KafkaConsumer struct { sarama.ConsumerGroup *sarama.Config sync.Mutex group string topicArray []string logger *zap.Logger scope tally.Scope msgCh chan consumer.Message // WARNING: The following channels should not be closed by the lib users closeAttempted bool closeCh chan struct{} } // KafkaMessage implements Message interface type KafkaMessage struct { *sarama.ConsumerMessage consumer consumer.Consumer clusterName string session sarama.ConsumerGroupSession } // CGHandler represents a Sarama consumer group handler type CGHandler struct { consumer *KafkaConsumer msgCounter map[string]map[int32]tally.Counter msgByteCounter map[string]map[int32]tally.Counter msgOffsetGauge map[string]map[int32]tally.Gauge msgLagGauge map[string]map[int32]tally.Gauge } // GetConsumerGroupName will return the consumer group name to use or being used // for given deployment and job name func GetConsumerGroupName(deployment, jobName string, aresCluster string) string { return fmt.Sprintf("ares-subscriber_%s_%s_%s_streaming", deployment, jobName, aresCluster) } func getKafkaVersion(v string) sarama.KafkaVersion { switch v { case "V0_10_2_0": return sarama.V0_10_2_0 case "V0_10_2_1": return sarama.V0_10_2_1 case "V0_11_0_0": return sarama.V0_11_0_0 case "V0_11_0_1": return sarama.V0_11_0_1 case "V0_11_0_2": return sarama.V0_11_0_2 case "V1_0_0_0": return sarama.V1_0_0_0 case "V1_1_0_0": return sarama.V1_1_0_0 case "V1_1_1_0": return sarama.V1_1_1_0 case "V2_0_0_0": return sarama.V2_0_0_0 case "V2_0_1_0": return sarama.V2_0_1_0 case "V2_1_0_0": return sarama.V2_1_0_0 case "V2_2_0_0": return sarama.V2_2_0_0 default: return sarama.V0_10_2_0 } } // NewKafkaConsumer creates kafka consumer func NewKafkaConsumer(jobConfig *rules.JobConfig, serviceConfig config.ServiceConfig) (consumer.Consumer, error) { cfg := sarama.NewConfig() if jobConfig.StreamingConfig.SessionTimeoutMs > 0 { cfg.Consumer.Group.Session.Timeout = time.Duration(jobConfig.StreamingConfig.SessionTimeoutMs) * time.Millisecond } offsetReset := sarama.OffsetOldest if jobConfig.StreamingConfig.LatestOffset { offsetReset = sarama.OffsetNewest } cfg.Consumer.Offsets.Initial = offsetReset cfg.Consumer.Return.Errors = true if jobConfig.StreamingConfig.ReblanceTimeoutSec > 0 { cfg.Consumer.Group.Rebalance.Timeout = time.Duration(jobConfig.StreamingConfig.ReblanceTimeoutSec) * time.Second } cfg.Version = getKafkaVersion(jobConfig.StreamingConfig.KafkaVersion) serviceConfig.Logger.Info("Kafka consumer", zap.String("job", jobConfig.Name), zap.String("broker", jobConfig.StreamingConfig.KafkaBroker), zap.Any("config", cfg)) group := GetConsumerGroupName(serviceConfig.Environment.Deployment, jobConfig.Name, jobConfig.AresTableConfig.Cluster) c, err := sarama.NewConsumerGroup(strings.Split(jobConfig.StreamingConfig.KafkaBroker, ","), group, cfg) if err != nil { return nil, utils.StackError(err, "Unable to initialize Kafka consumer") } logger := serviceConfig.Logger.With( zap.String("kafkaBroker", jobConfig.StreamingConfig.KafkaBroker), zap.String("topic", jobConfig.StreamingConfig.Topic), ) scope := serviceConfig.Scope.Tagged(map[string]string{ "broker": jobConfig.StreamingConfig.KafkaBroker, }) kc := KafkaConsumer{ ConsumerGroup: c, Config: cfg, group: group, topicArray: []string{jobConfig.StreamingConfig.Topic}, logger: logger, scope: scope, msgCh: make(chan consumer.Message, jobConfig.StreamingConfig.ChannelBufferSize), closeCh: make(chan struct{}), } cgHandler := CGHandler{ consumer: &kc, } ctx := context.Background() go kc.startConsuming(ctx, &cgHandler) logger.Info("Consumer is up and running") return &kc, nil } // Name returns the name of this consumer group. func (c *KafkaConsumer) Name() string { return c.group } // Topics returns the names of the topics being consumed. func (c *KafkaConsumer) Topics() []string { return append([]string(nil), c.topicArray...) } // Errors returns a channel of errors for the topic. To prevent deadlocks, // users must read from the error channel. // // All errors returned from this channel can be safely cast to the // consumer.Error interface, which allows structured access to the topic // name and partition number. func (c *KafkaConsumer) Errors() <-chan error { return c.ConsumerGroup.Errors() } // Closed returns a channel that unblocks when the consumer successfully shuts // down. func (c *KafkaConsumer) Closed() <-chan struct{} { return c.closeCh } // SetClosed is used for testing func (c *KafkaConsumer) SetClosed(closeCh chan struct{}) { c.closeCh = closeCh } // Messages returns a channel of messages for the topic. // // If the consumer is not configured with nonzero buffer size, the Errors() // channel must be read in conjunction with Messages() to prevent deadlocks. func (c *KafkaConsumer) Messages() <-chan consumer.Message { return c.msgCh } // SetMessages is used for testing func (c *KafkaConsumer) SetMessages(msgCh chan consumer.Message) { c.msgCh = msgCh } // CommitUpTo marks this message and all previous messages in the same partition // as processed. The last processed offset for each partition is periodically // flushed to ZooKeeper; on startup, consumers begin processing after the last // stored offset. func (c *KafkaConsumer) CommitUpTo(msg consumer.Message) error { if concreteMsg, ok := msg.(*KafkaMessage); ok { if concreteMsg.session != nil { concreteMsg.session.MarkMessage(concreteMsg.ConsumerMessage, "") } else { return fmt.Errorf("Session is nil, msg:%v", msg) } } else { return fmt.Errorf("Failed to convert KafkaMessage, msg:%v", msg) } return nil } func (c *KafkaConsumer) startConsuming(ctx context.Context, cgHandler *CGHandler) { c.logger.Info("Start consumption goroutine") // those four Metrics are of the format {"<topic name>":{<partition id>: <offset>, ...}, ...} cgHandler.msgCounter = make(map[string]map[int32]tally.Counter) cgHandler.msgByteCounter = make(map[string]map[int32]tally.Counter) cgHandler.msgOffsetGauge = make(map[string]map[int32]tally.Gauge) cgHandler.msgLagGauge = make(map[string]map[int32]tally.Gauge) // initialize counter map for _, topic := range c.topicArray { cgHandler.msgCounter[topic] = make(map[int32]tally.Counter) cgHandler.msgByteCounter[topic] = make(map[int32]tally.Counter) cgHandler.msgOffsetGauge[topic] = make(map[int32]tally.Gauge) cgHandler.msgLagGauge[topic] = make(map[int32]tally.Gauge) } for run := true; run; { if err := c.ConsumerGroup.Consume(ctx, c.topicArray, cgHandler); err != nil { c.logger.Error("Received error from consumer", zap.Error(err)) run = false } // check if context was cancelled, signaling that the consumer should stop if ctx.Err() != nil { run = false c.logger.Info("Received close Signal") } if !run { c.Close() } } } func (c *KafkaConsumer) processMsg(msg *sarama.ConsumerMessage, cgHandler *CGHandler, highWaterOffset int64, session sarama.ConsumerGroupSession) { c.Lock() defer c.Unlock() c.logger.Debug("Received nessage event", zap.Any("message", msg)) c.msgCh <- &KafkaMessage{ ConsumerMessage: msg, consumer: c, session: session, } topic := msg.Topic partition := msg.Partition pncm := cgHandler.msgCounter[topic] nCounter, ok := pncm[partition] if !ok { nCounter = c.scope.Tagged(map[string]string{"topic": topic, "partition": strconv.Itoa(int(partition))}).Counter("messages-count") pncm[partition] = nCounter } nCounter.Inc(1) pbcm := cgHandler.msgByteCounter[topic] bCounter, ok := pbcm[partition] if !ok { bCounter = c.scope.Tagged(map[string]string{"topic": topic, "partition": strconv.Itoa(int(partition))}).Counter("message-bytes-count") pbcm[partition] = bCounter } bCounter.Inc(int64(len(msg.Value))) pogm := cgHandler.msgOffsetGauge[topic] oGauge, ok := pogm[partition] if !ok { oGauge = c.scope.Tagged(map[string]string{"topic": topic, "partition": strconv.Itoa(int(partition))}).Gauge("latest-offset") pogm[partition] = oGauge } oGauge.Update(float64(msg.Offset)) plgm := cgHandler.msgLagGauge[topic] lGauge, ok := plgm[partition] if !ok { lGauge = c.scope.Tagged(map[string]string{"topic": topic, "partition": strconv.Itoa(int(partition))}).Gauge("offset-lag") } if highWaterOffset > int64(msg.Offset) { lGauge.Update(float64(highWaterOffset - int64(msg.Offset) - 1)) } else { lGauge.Update(0) } } func (c *KafkaConsumer) Close() error { c.Lock() defer c.Unlock() if c.closeAttempted { return fmt.Errorf("Close attempted again on consumer group %s", c.group) } c.logger.Info("Attempting to close consumer", zap.String("consumerGroup", c.group)) err := c.ConsumerGroup.Close() if err != nil { c.logger.Error("Failed to close consumer", zap.String("consumerGroup", c.group), zap.Error(err)) } else { c.logger.Info("Started to close consumer", zap.String("consumerGroup", c.group)) } close(c.closeCh) c.closeAttempted = true return err } func (m *KafkaMessage) Key() []byte { return m.ConsumerMessage.Key } func (m *KafkaMessage) Value() []byte { return m.ConsumerMessage.Value } func (m *KafkaMessage) Topic() string { return m.ConsumerMessage.Topic } func (m *KafkaMessage) Partition() int32 { return m.ConsumerMessage.Partition } func (m *KafkaMessage) Offset() int64 { return m.ConsumerMessage.Offset } func (m *KafkaMessage) Ack() { if m.consumer != nil { m.consumer.CommitUpTo(m) } } func (m *KafkaMessage) Nack() { // No op for now since Kafka based DLQ is not implemented } func (m *KafkaMessage) Cluster() string { return m.clusterName } // Setup is run at the beginning of a new session, before ConsumeClaim func (h *CGHandler) Setup(sarama.ConsumerGroupSession) error { return nil } // Cleanup is run at the end of a session, once all ConsumeClaim goroutines have exited func (h *CGHandler) Cleanup(sarama.ConsumerGroupSession) error { return nil } // ConsumeClaim must start a consumer loop of ConsumerGroupClaim's Messages(). func (h *CGHandler) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error { // NOTE: // Do not move the code below to a goroutine. // The `ConsumeClaim` itself is called within a goroutine, see: // https://github.com/Shopify/sarama/blob/master/consumer_group.go#L27-L29 for message := range claim.Messages() { h.consumer.processMsg(message, h, claim.HighWaterMarkOffset(), session) } return nil }