receiver/kafkareceiver/kafka_receiver.go (748 lines of code) (raw):

// Copyright The OpenTelemetry Authors // SPDX-License-Identifier: Apache-2.0 package kafkareceiver // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/kafkareceiver" import ( "context" "errors" "strconv" "sync" "time" "github.com/IBM/sarama" "github.com/cenkalti/backoff/v4" "go.opentelemetry.io/collector/client" "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/config/configretry" "go.opentelemetry.io/collector/consumer" "go.opentelemetry.io/collector/pdata/plog" "go.opentelemetry.io/collector/pdata/pmetric" "go.opentelemetry.io/collector/pdata/ptrace" "go.opentelemetry.io/collector/receiver" "go.opentelemetry.io/collector/receiver/receiverhelper" "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/metric" "go.uber.org/zap" "github.com/open-telemetry/opentelemetry-collector-contrib/internal/kafka" "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/kafkareceiver/internal/metadata" ) const ( transport = "kafka" // TODO: update the following attributes to reflect semconv attrInstanceName = "name" attrPartition = "partition" ) var errMemoryLimiterDataRefused = errors.New("data refused due to high memory usage") // kafkaTracesConsumer uses sarama to consume and handle messages from kafka. type kafkaTracesConsumer struct { config Config consumerGroup sarama.ConsumerGroup nextConsumer consumer.Traces topics []string cancelConsumeLoop context.CancelFunc unmarshaler ptrace.Unmarshaler consumeLoopWG *sync.WaitGroup settings receiver.Settings telemetryBuilder *metadata.TelemetryBuilder autocommitEnabled bool messageMarking MessageMarking headerExtraction bool headers []string minFetchSize int32 defaultFetchSize int32 maxFetchSize int32 } // kafkaMetricsConsumer uses sarama to consume and handle messages from kafka. type kafkaMetricsConsumer struct { config Config consumerGroup sarama.ConsumerGroup nextConsumer consumer.Metrics topics []string cancelConsumeLoop context.CancelFunc unmarshaler pmetric.Unmarshaler consumeLoopWG *sync.WaitGroup settings receiver.Settings telemetryBuilder *metadata.TelemetryBuilder autocommitEnabled bool messageMarking MessageMarking headerExtraction bool headers []string minFetchSize int32 defaultFetchSize int32 maxFetchSize int32 } // kafkaLogsConsumer uses sarama to consume and handle messages from kafka. type kafkaLogsConsumer struct { config Config consumerGroup sarama.ConsumerGroup nextConsumer consumer.Logs topics []string cancelConsumeLoop context.CancelFunc unmarshaler plog.Unmarshaler consumeLoopWG *sync.WaitGroup settings receiver.Settings telemetryBuilder *metadata.TelemetryBuilder autocommitEnabled bool messageMarking MessageMarking headerExtraction bool headers []string minFetchSize int32 defaultFetchSize int32 maxFetchSize int32 } var ( _ receiver.Traces = (*kafkaTracesConsumer)(nil) _ receiver.Metrics = (*kafkaMetricsConsumer)(nil) _ receiver.Logs = (*kafkaLogsConsumer)(nil) ) func newTracesReceiver(config Config, set receiver.Settings, nextConsumer consumer.Traces) (*kafkaTracesConsumer, error) { telemetryBuilder, err := metadata.NewTelemetryBuilder(set.TelemetrySettings) if err != nil { return nil, err } return &kafkaTracesConsumer{ config: config, topics: []string{config.Topic}, nextConsumer: nextConsumer, consumeLoopWG: &sync.WaitGroup{}, settings: set, autocommitEnabled: config.AutoCommit.Enable, messageMarking: config.MessageMarking, headerExtraction: config.HeaderExtraction.ExtractHeaders, headers: config.HeaderExtraction.Headers, telemetryBuilder: telemetryBuilder, minFetchSize: config.MinFetchSize, defaultFetchSize: config.DefaultFetchSize, maxFetchSize: config.MaxFetchSize, }, nil } func createKafkaClient(ctx context.Context, config Config) (sarama.ConsumerGroup, error) { return kafka.NewSaramaConsumerGroup(ctx, config.ClientConfig, config.ConsumerConfig) } func (c *kafkaTracesConsumer) Start(_ context.Context, host component.Host) error { ctx, cancel := context.WithCancel(context.Background()) c.cancelConsumeLoop = cancel obsrecv, err := receiverhelper.NewObsReport(receiverhelper.ObsReportSettings{ ReceiverID: c.settings.ID, Transport: transport, ReceiverCreateSettings: c.settings, }) if err != nil { return err } unmarshaler, err := newTracesUnmarshaler(c.config.Encoding, c.settings, host) if err != nil { return err } c.unmarshaler = unmarshaler // consumerGroup may be set in tests to inject fake implementation. if c.consumerGroup == nil { if c.consumerGroup, err = createKafkaClient(ctx, c.config); err != nil { return err } } consumerGroup := &tracesConsumerGroupHandler{ logger: c.settings.Logger, encoding: c.config.Encoding, unmarshaler: c.unmarshaler, nextConsumer: c.nextConsumer, ready: make(chan bool), obsrecv: obsrecv, autocommitEnabled: c.autocommitEnabled, messageMarking: c.messageMarking, headerExtractor: &nopHeaderExtractor{}, telemetryBuilder: c.telemetryBuilder, backOff: newExponentialBackOff(c.config.ErrorBackOff), } if c.headerExtraction { consumerGroup.headerExtractor = &headerExtractor{ logger: c.settings.Logger, headers: c.headers, } } c.consumeLoopWG.Add(1) go c.consumeLoop(ctx, consumerGroup) <-consumerGroup.ready return nil } func (c *kafkaTracesConsumer) consumeLoop(ctx context.Context, handler sarama.ConsumerGroupHandler) { defer c.consumeLoopWG.Done() for { // `Consume` should be called inside an infinite loop, when a // server-side rebalance happens, the consumer session will need to be // recreated to get the new claims if err := c.consumerGroup.Consume(ctx, c.topics, handler); err != nil { c.settings.Logger.Error("Error from consumer", zap.Error(err)) } // check if context was cancelled, signaling that the consumer should stop if ctx.Err() != nil { c.settings.Logger.Info("Consumer stopped", zap.Error(ctx.Err())) return } } } func (c *kafkaTracesConsumer) Shutdown(context.Context) error { if c.cancelConsumeLoop == nil { return nil } c.cancelConsumeLoop() c.consumeLoopWG.Wait() if c.consumerGroup == nil { return nil } return c.consumerGroup.Close() } func newMetricsReceiver(config Config, set receiver.Settings, nextConsumer consumer.Metrics) (*kafkaMetricsConsumer, error) { telemetryBuilder, err := metadata.NewTelemetryBuilder(set.TelemetrySettings) if err != nil { return nil, err } return &kafkaMetricsConsumer{ config: config, topics: []string{config.Topic}, nextConsumer: nextConsumer, consumeLoopWG: &sync.WaitGroup{}, settings: set, autocommitEnabled: config.AutoCommit.Enable, messageMarking: config.MessageMarking, headerExtraction: config.HeaderExtraction.ExtractHeaders, headers: config.HeaderExtraction.Headers, telemetryBuilder: telemetryBuilder, minFetchSize: config.MinFetchSize, defaultFetchSize: config.DefaultFetchSize, maxFetchSize: config.MaxFetchSize, }, nil } func (c *kafkaMetricsConsumer) Start(_ context.Context, host component.Host) error { ctx, cancel := context.WithCancel(context.Background()) c.cancelConsumeLoop = cancel obsrecv, err := receiverhelper.NewObsReport(receiverhelper.ObsReportSettings{ ReceiverID: c.settings.ID, Transport: transport, ReceiverCreateSettings: c.settings, }) if err != nil { return err } unmarshaler, err := newMetricsUnmarshaler(c.config.Encoding, c.settings, host) if err != nil { return err } c.unmarshaler = unmarshaler // consumerGroup may be set in tests to inject fake implementation. if c.consumerGroup == nil { if c.consumerGroup, err = createKafkaClient(ctx, c.config); err != nil { return err } } metricsConsumerGroup := &metricsConsumerGroupHandler{ logger: c.settings.Logger, encoding: c.config.Encoding, unmarshaler: c.unmarshaler, nextConsumer: c.nextConsumer, ready: make(chan bool), obsrecv: obsrecv, autocommitEnabled: c.autocommitEnabled, messageMarking: c.messageMarking, headerExtractor: &nopHeaderExtractor{}, telemetryBuilder: c.telemetryBuilder, backOff: newExponentialBackOff(c.config.ErrorBackOff), } if c.headerExtraction { metricsConsumerGroup.headerExtractor = &headerExtractor{ logger: c.settings.Logger, headers: c.headers, } } c.consumeLoopWG.Add(1) go c.consumeLoop(ctx, metricsConsumerGroup) <-metricsConsumerGroup.ready return nil } func (c *kafkaMetricsConsumer) consumeLoop(ctx context.Context, handler sarama.ConsumerGroupHandler) { defer c.consumeLoopWG.Done() for { // `Consume` should be called inside an infinite loop, when a // server-side rebalance happens, the consumer session will need to be // recreated to get the new claims if err := c.consumerGroup.Consume(ctx, c.topics, handler); err != nil { c.settings.Logger.Error("Error from consumer", zap.Error(err)) } // check if context was cancelled, signaling that the consumer should stop if ctx.Err() != nil { c.settings.Logger.Info("Consumer stopped", zap.Error(ctx.Err())) return } } } func (c *kafkaMetricsConsumer) Shutdown(context.Context) error { if c.cancelConsumeLoop == nil { return nil } c.cancelConsumeLoop() c.consumeLoopWG.Wait() if c.consumerGroup == nil { return nil } return c.consumerGroup.Close() } func newLogsReceiver(config Config, set receiver.Settings, nextConsumer consumer.Logs) (*kafkaLogsConsumer, error) { telemetryBuilder, err := metadata.NewTelemetryBuilder(set.TelemetrySettings) if err != nil { return nil, err } return &kafkaLogsConsumer{ config: config, topics: []string{config.Topic}, nextConsumer: nextConsumer, consumeLoopWG: &sync.WaitGroup{}, settings: set, autocommitEnabled: config.AutoCommit.Enable, messageMarking: config.MessageMarking, headerExtraction: config.HeaderExtraction.ExtractHeaders, headers: config.HeaderExtraction.Headers, telemetryBuilder: telemetryBuilder, minFetchSize: config.MinFetchSize, defaultFetchSize: config.DefaultFetchSize, maxFetchSize: config.MaxFetchSize, }, nil } func (c *kafkaLogsConsumer) Start(_ context.Context, host component.Host) error { ctx, cancel := context.WithCancel(context.Background()) c.cancelConsumeLoop = cancel obsrecv, err := receiverhelper.NewObsReport(receiverhelper.ObsReportSettings{ ReceiverID: c.settings.ID, Transport: transport, ReceiverCreateSettings: c.settings, }) if err != nil { return err } unmarshaler, err := newLogsUnmarshaler(c.config.Encoding, c.settings, host) if err != nil { return err } c.unmarshaler = unmarshaler // consumerGroup may be set in tests to inject fake implementation. if c.consumerGroup == nil { if c.consumerGroup, err = createKafkaClient(ctx, c.config); err != nil { return err } } logsConsumerGroup := &logsConsumerGroupHandler{ logger: c.settings.Logger, encoding: c.config.Encoding, unmarshaler: c.unmarshaler, nextConsumer: c.nextConsumer, ready: make(chan bool), obsrecv: obsrecv, autocommitEnabled: c.autocommitEnabled, messageMarking: c.messageMarking, headerExtractor: &nopHeaderExtractor{}, telemetryBuilder: c.telemetryBuilder, backOff: newExponentialBackOff(c.config.ErrorBackOff), } if c.headerExtraction { logsConsumerGroup.headerExtractor = &headerExtractor{ logger: c.settings.Logger, headers: c.headers, } } c.consumeLoopWG.Add(1) go c.consumeLoop(ctx, logsConsumerGroup) <-logsConsumerGroup.ready return nil } func (c *kafkaLogsConsumer) consumeLoop(ctx context.Context, handler sarama.ConsumerGroupHandler) { defer c.consumeLoopWG.Done() for { // `Consume` should be called inside an infinite loop, when a // server-side rebalance happens, the consumer session will need to be // recreated to get the new claims if err := c.consumerGroup.Consume(ctx, c.topics, handler); err != nil { c.settings.Logger.Error("Error from consumer", zap.Error(err)) } // check if context was cancelled, signaling that the consumer should stop if ctx.Err() != nil { c.settings.Logger.Info("Consumer stopped", zap.Error(ctx.Err())) return } } } func (c *kafkaLogsConsumer) Shutdown(context.Context) error { if c.cancelConsumeLoop == nil { return nil } c.cancelConsumeLoop() c.consumeLoopWG.Wait() if c.consumerGroup == nil { return nil } return c.consumerGroup.Close() } type tracesConsumerGroupHandler struct { id component.ID encoding string unmarshaler ptrace.Unmarshaler nextConsumer consumer.Traces ready chan bool readyCloser sync.Once logger *zap.Logger obsrecv *receiverhelper.ObsReport telemetryBuilder *metadata.TelemetryBuilder autocommitEnabled bool messageMarking MessageMarking headerExtractor HeaderExtractor backOff *backoff.ExponentialBackOff backOffMutex sync.Mutex } type metricsConsumerGroupHandler struct { id component.ID encoding string unmarshaler pmetric.Unmarshaler nextConsumer consumer.Metrics ready chan bool readyCloser sync.Once logger *zap.Logger obsrecv *receiverhelper.ObsReport telemetryBuilder *metadata.TelemetryBuilder autocommitEnabled bool messageMarking MessageMarking headerExtractor HeaderExtractor backOff *backoff.ExponentialBackOff backOffMutex sync.Mutex } type logsConsumerGroupHandler struct { id component.ID encoding string unmarshaler plog.Unmarshaler nextConsumer consumer.Logs ready chan bool readyCloser sync.Once logger *zap.Logger obsrecv *receiverhelper.ObsReport telemetryBuilder *metadata.TelemetryBuilder autocommitEnabled bool messageMarking MessageMarking headerExtractor HeaderExtractor backOff *backoff.ExponentialBackOff backOffMutex sync.Mutex } var ( _ sarama.ConsumerGroupHandler = (*tracesConsumerGroupHandler)(nil) _ sarama.ConsumerGroupHandler = (*metricsConsumerGroupHandler)(nil) _ sarama.ConsumerGroupHandler = (*logsConsumerGroupHandler)(nil) ) func (c *tracesConsumerGroupHandler) Setup(session sarama.ConsumerGroupSession) error { c.readyCloser.Do(func() { close(c.ready) }) c.telemetryBuilder.KafkaReceiverPartitionStart.Add(session.Context(), 1, metric.WithAttributes(attribute.String(attrInstanceName, c.id.Name()))) return nil } func (c *tracesConsumerGroupHandler) Cleanup(session sarama.ConsumerGroupSession) error { c.telemetryBuilder.KafkaReceiverPartitionClose.Add(session.Context(), 1, metric.WithAttributes(attribute.String(attrInstanceName, c.id.Name()))) return nil } func (c *tracesConsumerGroupHandler) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error { c.logger.Info("Starting consumer group", zap.Int32("partition", claim.Partition())) if !c.autocommitEnabled { defer session.Commit() } for { select { case message, ok := <-claim.Messages(): if !ok { return nil } c.logger.Debug("Kafka message claimed", zap.String("value", string(message.Value)), zap.Time("timestamp", message.Timestamp), zap.String("topic", message.Topic)) if !c.messageMarking.After { session.MarkMessage(message, "") } // If the Kafka exporter has propagated headers in the message, // create a new context with client.Info in it. ctx := newContextWithHeaders(session.Context(), message.Headers) ctx = c.obsrecv.StartTracesOp(ctx) attrs := attribute.NewSet( attribute.String(attrInstanceName, c.id.String()), attribute.String(attrPartition, strconv.Itoa(int(claim.Partition()))), ) c.telemetryBuilder.KafkaReceiverMessages.Add(ctx, 1, metric.WithAttributeSet(attrs)) c.telemetryBuilder.KafkaReceiverCurrentOffset.Record(ctx, message.Offset, metric.WithAttributeSet(attrs)) c.telemetryBuilder.KafkaReceiverOffsetLag.Record(ctx, claim.HighWaterMarkOffset()-message.Offset-1, metric.WithAttributeSet(attrs)) traces, err := c.unmarshaler.UnmarshalTraces(message.Value) if err != nil { c.logger.Error("failed to unmarshal message", zap.Error(err)) c.telemetryBuilder.KafkaReceiverUnmarshalFailedSpans.Add(session.Context(), 1, metric.WithAttributes(attribute.String(attrInstanceName, c.id.String()))) if c.messageMarking.After && c.messageMarking.OnError { session.MarkMessage(message, "") } return err } c.headerExtractor.extractHeadersTraces(traces, message) spanCount := traces.SpanCount() err = c.nextConsumer.ConsumeTraces(ctx, traces) c.obsrecv.EndTracesOp(ctx, c.encoding, spanCount, err) if err != nil { if errorRequiresBackoff(err) && c.backOff != nil { backOffDelay := c.getNextBackoff() if backOffDelay != backoff.Stop { c.logger.Info("Backing off due to error from the next consumer.", zap.Error(err), zap.Duration("delay", backOffDelay), zap.String("topic", message.Topic), zap.Int32("partition", claim.Partition())) select { case <-session.Context().Done(): return nil case <-time.After(backOffDelay): if !c.messageMarking.After { // Unmark the message so it can be retried session.ResetOffset(claim.Topic(), claim.Partition(), message.Offset, "") } return err } } c.logger.Info("Stop error backoff because the configured max_elapsed_time is reached", zap.Duration("max_elapsed_time", c.backOff.MaxElapsedTime)) } if c.messageMarking.After && c.messageMarking.OnError { session.MarkMessage(message, "") } return err } if c.backOff != nil { c.resetBackoff() } if c.messageMarking.After { session.MarkMessage(message, "") } if !c.autocommitEnabled { session.Commit() } // Should return when `session.Context()` is done. // If not, will raise `ErrRebalanceInProgress` or `read tcp <ip>:<port>: i/o timeout` when kafka rebalance. see: // https://github.com/IBM/sarama/issues/1192 case <-session.Context().Done(): return nil } } } func (c *tracesConsumerGroupHandler) getNextBackoff() time.Duration { c.backOffMutex.Lock() defer c.backOffMutex.Unlock() return c.backOff.NextBackOff() } func (c *tracesConsumerGroupHandler) resetBackoff() { c.backOffMutex.Lock() defer c.backOffMutex.Unlock() c.backOff.Reset() } func (c *metricsConsumerGroupHandler) Setup(session sarama.ConsumerGroupSession) error { c.readyCloser.Do(func() { close(c.ready) }) c.telemetryBuilder.KafkaReceiverPartitionStart.Add(session.Context(), 1, metric.WithAttributes(attribute.String(attrInstanceName, c.id.Name()))) return nil } func (c *metricsConsumerGroupHandler) Cleanup(session sarama.ConsumerGroupSession) error { c.telemetryBuilder.KafkaReceiverPartitionClose.Add(session.Context(), 1, metric.WithAttributes(attribute.String(attrInstanceName, c.id.Name()))) return nil } func (c *metricsConsumerGroupHandler) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error { c.logger.Info("Starting consumer group", zap.Int32("partition", claim.Partition())) if !c.autocommitEnabled { defer session.Commit() } for { select { case message, ok := <-claim.Messages(): if !ok { return nil } c.logger.Debug("Kafka message claimed", zap.String("value", string(message.Value)), zap.Time("timestamp", message.Timestamp), zap.String("topic", message.Topic)) if !c.messageMarking.After { session.MarkMessage(message, "") } // If the Kafka exporter has propagated headers in the message, // create a new context with client.Info in it. ctx := newContextWithHeaders(session.Context(), message.Headers) ctx = c.obsrecv.StartMetricsOp(ctx) attrs := attribute.NewSet( attribute.String(attrInstanceName, c.id.String()), attribute.String(attrPartition, strconv.Itoa(int(claim.Partition()))), ) c.telemetryBuilder.KafkaReceiverMessages.Add(ctx, 1, metric.WithAttributeSet(attrs)) c.telemetryBuilder.KafkaReceiverCurrentOffset.Record(ctx, message.Offset, metric.WithAttributeSet(attrs)) c.telemetryBuilder.KafkaReceiverOffsetLag.Record(ctx, claim.HighWaterMarkOffset()-message.Offset-1, metric.WithAttributeSet(attrs)) metrics, err := c.unmarshaler.UnmarshalMetrics(message.Value) if err != nil { c.logger.Error("failed to unmarshal message", zap.Error(err)) c.telemetryBuilder.KafkaReceiverUnmarshalFailedMetricPoints.Add(session.Context(), 1, metric.WithAttributes(attribute.String(attrInstanceName, c.id.String()))) if c.messageMarking.After && c.messageMarking.OnError { session.MarkMessage(message, "") } return err } c.headerExtractor.extractHeadersMetrics(metrics, message) dataPointCount := metrics.DataPointCount() err = c.nextConsumer.ConsumeMetrics(ctx, metrics) c.obsrecv.EndMetricsOp(ctx, c.encoding, dataPointCount, err) if err != nil { if errorRequiresBackoff(err) && c.backOff != nil { backOffDelay := c.getNextBackoff() if backOffDelay != backoff.Stop { c.logger.Info("Backing off due to error from the next consumer.", zap.Error(err), zap.Duration("delay", backOffDelay), zap.String("topic", message.Topic), zap.Int32("partition", claim.Partition())) select { case <-session.Context().Done(): return nil case <-time.After(backOffDelay): if !c.messageMarking.After { // Unmark the message so it can be retried session.ResetOffset(claim.Topic(), claim.Partition(), message.Offset, "") } return err } } c.logger.Info("Stop error backoff because the configured max_elapsed_time is reached", zap.Duration("max_elapsed_time", c.backOff.MaxElapsedTime)) } if c.messageMarking.After && c.messageMarking.OnError { session.MarkMessage(message, "") } return err } if c.backOff != nil { c.resetBackoff() } if c.messageMarking.After { session.MarkMessage(message, "") } if !c.autocommitEnabled { session.Commit() } // Should return when `session.Context()` is done. // If not, will raise `ErrRebalanceInProgress` or `read tcp <ip>:<port>: i/o timeout` when kafka rebalance. see: // https://github.com/IBM/sarama/issues/1192 case <-session.Context().Done(): return nil } } } func (c *metricsConsumerGroupHandler) getNextBackoff() time.Duration { c.backOffMutex.Lock() defer c.backOffMutex.Unlock() return c.backOff.NextBackOff() } func (c *metricsConsumerGroupHandler) resetBackoff() { c.backOffMutex.Lock() defer c.backOffMutex.Unlock() c.backOff.Reset() } func (c *logsConsumerGroupHandler) Setup(session sarama.ConsumerGroupSession) error { c.readyCloser.Do(func() { close(c.ready) }) c.telemetryBuilder.KafkaReceiverPartitionStart.Add(session.Context(), 1, metric.WithAttributes(attribute.String(attrInstanceName, c.id.String()))) return nil } func (c *logsConsumerGroupHandler) Cleanup(session sarama.ConsumerGroupSession) error { c.telemetryBuilder.KafkaReceiverPartitionClose.Add(session.Context(), 1, metric.WithAttributes(attribute.String(attrInstanceName, c.id.String()))) return nil } func (c *logsConsumerGroupHandler) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error { c.logger.Info("Starting consumer group", zap.Int32("partition", claim.Partition())) if !c.autocommitEnabled { defer session.Commit() } for { select { case message, ok := <-claim.Messages(): if !ok { return nil } c.logger.Debug("Kafka message claimed", zap.String("value", string(message.Value)), zap.Time("timestamp", message.Timestamp), zap.String("topic", message.Topic)) if !c.messageMarking.After { session.MarkMessage(message, "") } // If the Kafka exporter has propagated headers in the message, // create a new context with client.Info in it. ctx := newContextWithHeaders(session.Context(), message.Headers) ctx = c.obsrecv.StartLogsOp(ctx) attrs := attribute.NewSet( attribute.String(attrInstanceName, c.id.String()), attribute.String(attrPartition, strconv.Itoa(int(claim.Partition()))), ) c.telemetryBuilder.KafkaReceiverMessages.Add(ctx, 1, metric.WithAttributeSet(attrs)) c.telemetryBuilder.KafkaReceiverCurrentOffset.Record(ctx, message.Offset, metric.WithAttributeSet(attrs)) c.telemetryBuilder.KafkaReceiverOffsetLag.Record(ctx, claim.HighWaterMarkOffset()-message.Offset-1, metric.WithAttributeSet(attrs)) logs, err := c.unmarshaler.UnmarshalLogs(message.Value) if err != nil { c.logger.Error("failed to unmarshal message", zap.Error(err)) c.telemetryBuilder.KafkaReceiverUnmarshalFailedLogRecords.Add(ctx, 1, metric.WithAttributes(attribute.String(attrInstanceName, c.id.String()))) if c.messageMarking.After && c.messageMarking.OnError { session.MarkMessage(message, "") } return err } c.headerExtractor.extractHeadersLogs(logs, message) logRecordCount := logs.LogRecordCount() err = c.nextConsumer.ConsumeLogs(ctx, logs) c.obsrecv.EndLogsOp(ctx, c.encoding, logRecordCount, err) if err != nil { if errorRequiresBackoff(err) && c.backOff != nil { backOffDelay := c.getNextBackoff() if backOffDelay != backoff.Stop { c.logger.Info("Backing off due to error from the next consumer.", zap.Error(err), zap.Duration("delay", backOffDelay), zap.String("topic", message.Topic), zap.Int32("partition", claim.Partition())) select { case <-session.Context().Done(): return nil case <-time.After(backOffDelay): if !c.messageMarking.After { // Unmark the message so it can be retried session.ResetOffset(claim.Topic(), claim.Partition(), message.Offset, "") } return err } } c.logger.Info("Stop error backoff because the configured max_elapsed_time is reached", zap.Duration("max_elapsed_time", c.backOff.MaxElapsedTime)) } if c.messageMarking.After && c.messageMarking.OnError { session.MarkMessage(message, "") } return err } if c.backOff != nil { c.resetBackoff() } if c.messageMarking.After { session.MarkMessage(message, "") } if !c.autocommitEnabled { session.Commit() } // Should return when `session.Context()` is done. // If not, will raise `ErrRebalanceInProgress` or `read tcp <ip>:<port>: i/o timeout` when kafka rebalance. see: // https://github.com/IBM/sarama/issues/1192 case <-session.Context().Done(): return nil } } } func (c *logsConsumerGroupHandler) getNextBackoff() time.Duration { c.backOffMutex.Lock() defer c.backOffMutex.Unlock() return c.backOff.NextBackOff() } func (c *logsConsumerGroupHandler) resetBackoff() { c.backOffMutex.Lock() defer c.backOffMutex.Unlock() c.backOff.Reset() } func newExponentialBackOff(config configretry.BackOffConfig) *backoff.ExponentialBackOff { if !config.Enabled { return nil } backOff := backoff.NewExponentialBackOff() backOff.InitialInterval = config.InitialInterval backOff.RandomizationFactor = config.RandomizationFactor backOff.Multiplier = config.Multiplier backOff.MaxInterval = config.MaxInterval backOff.MaxElapsedTime = config.MaxElapsedTime backOff.Reset() return backOff } func errorRequiresBackoff(err error) bool { return err.Error() == errMemoryLimiterDataRefused.Error() } func newContextWithHeaders(ctx context.Context, headers []*sarama.RecordHeader, ) context.Context { if len(headers) == 0 { return ctx } m := make(map[string][]string, len(headers)) for _, header := range headers { key := string(header.Key) value := string(header.Value) m[key] = append(m[key], value) } return client.NewContext(ctx, client.Info{Metadata: client.NewMetadata(m)}) }