kafka/consumer.go (440 lines of code) (raw):

// Licensed to Elasticsearch B.V. under one or more contributor // license agreements. See the NOTICE file distributed with // this work for additional information regarding copyright // ownership. Elasticsearch B.V. licenses this file to you under // the Apache License, Version 2.0 (the "License"); you may // not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, // software distributed under the License is distributed on an // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. package kafka import ( "context" "errors" "fmt" "math" "strings" "sync" "time" "github.com/twmb/franz-go/pkg/kgo" "go.opentelemetry.io/otel/trace" "go.uber.org/zap" "golang.org/x/sync/errgroup" apmqueue "github.com/elastic/apm-queue/v2" "github.com/elastic/apm-queue/v2/queuecontext" ) var ( // ErrCommitFailed may be returned by `consumer.Run` when DeliveryType is // apmqueue.AtMostOnceDelivery. ErrCommitFailed = errors.New("kafka: failed to commit offsets") ) // ConsumerConfig defines the configuration for the Kafka consumer. type ConsumerConfig struct { CommonConfig // Topics that the consumer will consume messages from Topics []apmqueue.Topic // ConsumeRegex sets the client to parse all topics passed to ConsumeTopics // as regular expressions. ConsumeRegex bool // GroupID to join as part of the consumer group. GroupID string // MaxPollRecords defines an upper bound to the number of records that can // be polled on a single fetch. If MaxPollRecords <= 0, defaults to 500. // Note that this setting doesn't change how `franz-go` fetches and buffers // events from Kafka brokers, it merely affects the number of records that // are returned on `client.PollRecords`. // The higher this setting, the higher the general processing throughput // be. However, when Delivery is set to AtMostOnce, the higher this number, // the more events lost if the process crashes or terminates abruptly. // // It is best to keep the number of polled records small or the consumer // risks being forced out of the group if it exceeds rebalance.timeout.ms. // Default: 500 // Kafka consumer setting: max.poll.records // Docs: https://kafka.apache.org/28/documentation.html#consumerconfigs_max.poll.records MaxPollRecords int // MaxPollWait defines the maximum amount of time a broker will wait for a // fetch response to hit the minimum number of required bytes before // returning // Default: 5s // Kafka consumer setting: fetch.max.wait.ms // Docs: https://kafka.apache.org/28/documentation.html#consumerconfigs_fetch.max.wait.ms MaxPollWait time.Duration // // MaxConcurrentFetches sets the maximum number of fetch requests to allow in // flight or buffered at once, overriding the unbounded (i.e. number of // brokers) default. // This setting, paired with FetchMaxBytes, can upper bound the maximum amount // of memory that the client can use for consuming. // Default: Unbounded, total number of brokers. // Docs: https://pkg.go.dev/github.com/twmb/franz-go/pkg/kgo#MaxConcurrentFetches MaxConcurrentFetches int // MaxPollBytes sets the maximum amount of bytes a broker will try to send // during a fetch // Default: 52428800 bytes (~52MB, 50MiB) // Kafka consumer setting: fetch.max.bytes // Docs: https://kafka.apache.org/28/documentation.html#brokerconfigs_fetch.max.bytes MaxPollBytes int32 // MaxPollPartitionBytes sets the maximum amount of bytes that will be consumed for // a single partition in a fetch request // Default: 1048576 bytes (~1MB, 1MiB) // Kafka consumer setting: max.partition.fetch.bytes // Docs: https://kafka.apache.org/28/documentation.html#consumerconfigs_max.partition.fetch.bytes MaxPollPartitionBytes int32 // ShutdownGracePeriod defines the maximum amount of time to wait for the // partition consumers to process events before the underlying kgo.Client // is closed, overriding the default 5s. ShutdownGracePeriod time.Duration // Delivery mechanism to use to acknowledge the messages. // AtMostOnceDeliveryType and AtLeastOnceDeliveryType are supported. // If not set, it defaults to apmqueue.AtMostOnceDeliveryType. Delivery apmqueue.DeliveryType // Processor that will be used to process each event individually. // It is recommended to keep the synchronous processing fast and below the // rebalance.timeout.ms setting in Kafka. // // The processing time of each processing cycle can be calculated as: // record.process.time * MaxPollRecords. Processor apmqueue.Processor // FetchMinBytes sets the minimum amount of bytes a broker will try to send // during a fetch, overriding the default 1 byte. // Default: 1 // Kafka consumer setting: fetch.min.bytes // Docs: https://kafka.apache.org/28/documentation.html#consumerconfigs_fetch.min.bytes FetchMinBytes int32 // BrokerMaxReadBytes sets the maximum response size that can be read from // Kafka, overriding the default 100MiB. BrokerMaxReadBytes int32 // ConsumePreferringLagFn alters the order in which partitions are consumed. // Use with caution, as this can lead to uneven consumption of partitions, // and in the worst case scenario, in partitions starved out from being consumed. PreferLagFn kgo.PreferLagFn } // finalize ensures the configuration is valid, setting default values from // environment variables as described in doc comments, returning an error if // any configuration is invalid. func (cfg *ConsumerConfig) finalize() error { var errs []error if err := cfg.CommonConfig.finalize(); err != nil { errs = append(errs, err) } if len(cfg.Topics) == 0 { errs = append(errs, errors.New("kafka: at least one topic must be set")) } if cfg.GroupID == "" { errs = append(errs, errors.New("kafka: consumer GroupID must be set")) } if cfg.Processor == nil { errs = append(errs, errors.New("kafka: processor must be set")) } if cfg.MaxPollBytes < 0 { errs = append(errs, errors.New("kafka: max poll bytes cannot be negative")) } if cfg.MaxPollPartitionBytes < 0 { errs = append(errs, errors.New("kafka: max poll partition bytes cannot be negative")) } if cfg.FetchMinBytes < 0 { errs = append(errs, errors.New("kafka: fetch min bytes cannot be negative")) } if cfg.BrokerMaxReadBytes < 0 { errs = append(errs, errors.New("kafka: broker max read bytes cannot be negative")) } if cfg.MaxPollPartitionBytes > 1<<30 { cfg.Logger.Info("kafka: MaxPollPartitionBytes exceeds 1GiB, setting to 1GiB") cfg.MaxPollPartitionBytes = 1 << 30 } if cfg.BrokerMaxReadBytes > 1<<30 { cfg.Logger.Info("kafka: BrokerMaxReadBytes exceeds 1GiB, setting to 1GiB") cfg.BrokerMaxReadBytes = 1 << 30 } if cfg.MaxPollBytes > 0 { // math.MaxInt32 is 1<<31-1. if cfg.MaxPollBytes > 1<<30 { cfg.Logger.Info("kafka: MaxPollBytes exceeds 1GiB, setting to 1GiB") cfg.MaxPollBytes = 1 << 30 } if cfg.BrokerMaxReadBytes == 0 { cfg.Logger.Info("kafka: BrokerMaxReadBytes unset, setting to MaxPollBytes * 2 or 1GiB, whichever is smallest") cfg.BrokerMaxReadBytes = int32(math.Min(float64(cfg.MaxPollBytes)*2, 1<<30)) } if cfg.BrokerMaxReadBytes > 0 && cfg.BrokerMaxReadBytes < cfg.MaxPollBytes { errs = append(errs, fmt.Errorf( "kafka: BrokerMaxReadBytes (%d) cannot be less than MaxPollBytes (%d)", cfg.BrokerMaxReadBytes, cfg.MaxPollBytes, )) } } return errors.Join(errs...) } var _ apmqueue.Consumer = &Consumer{} // Consumer wraps a Kafka consumer and the consumption implementation details. // Consumes each partition in a dedicated goroutine. type Consumer struct { mu sync.RWMutex client *kgo.Client cfg ConsumerConfig consumer *consumer running chan struct{} closed chan struct{} forceClose context.CancelCauseFunc stopPoll context.CancelFunc tracer trace.Tracer } // NewConsumer creates a new instance of a Consumer. The consumer will read from // each partition concurrently by using a dedicated goroutine per partition. func NewConsumer(cfg ConsumerConfig) (*Consumer, error) { if err := cfg.finalize(); err != nil { return nil, fmt.Errorf("kafka: invalid consumer config: %w", err) } // `forceClose` is called by `Consumer.Close()` if / when the // `cfg.ShutdownGracePeriod` is exceeded. processingCtx, forceClose := context.WithCancelCause(context.Background()) namespacePrefix := cfg.namespacePrefix() consumer := &consumer{ topicPrefix: namespacePrefix, logFieldFn: cfg.TopicLogFieldFunc, assignments: make(map[topicPartition]*pc), processor: cfg.Processor, logger: cfg.Logger.Named("partition"), delivery: cfg.Delivery, ctx: processingCtx, } topics := make([]string, len(cfg.Topics)) for i, topic := range cfg.Topics { topics[i] = fmt.Sprintf("%s%s", consumer.topicPrefix, topic) } opts := []kgo.Opt{ // Injects the kgo.Client context as the record.Context. kgo.WithHooks(consumer), kgo.ConsumerGroup(cfg.GroupID), kgo.ConsumeTopics(topics...), // If a rebalance happens while the client is polling, the consumed // records may belong to a partition which has been reassigned to a // different consumer int he group. To avoid this scenario, Polls will // block rebalances of partitions which would be lost, and the consumer // MUST manually call `AllowRebalance`. kgo.BlockRebalanceOnPoll(), kgo.DisableAutoCommit(), // Assign concurrent consumer callbacks to ensure consuming starts // for newly assigned partitions, and consuming ceases from lost or // revoked partitions. kgo.OnPartitionsAssigned(consumer.assigned), kgo.OnPartitionsLost(consumer.lost), kgo.OnPartitionsRevoked(consumer.lost), } if cfg.ConsumeRegex { opts = append(opts, kgo.ConsumeRegex()) } if cfg.MaxPollWait > 0 { opts = append(opts, kgo.FetchMaxWait(cfg.MaxPollWait)) } if cfg.MaxPollBytes != 0 { opts = append(opts, kgo.FetchMaxBytes(cfg.MaxPollBytes)) } if cfg.MaxPollPartitionBytes != 0 { opts = append(opts, kgo.FetchMaxPartitionBytes(cfg.MaxPollPartitionBytes)) } if cfg.MaxConcurrentFetches > 0 { opts = append(opts, kgo.MaxConcurrentFetches(cfg.MaxConcurrentFetches)) } if cfg.PreferLagFn != nil { opts = append(opts, kgo.ConsumePreferringLagFn(cfg.PreferLagFn)) } if cfg.ShutdownGracePeriod <= 0 { cfg.ShutdownGracePeriod = 5 * time.Second } if cfg.FetchMinBytes > 0 { opts = append(opts, kgo.FetchMinBytes(cfg.FetchMinBytes)) } if cfg.BrokerMaxReadBytes > 0 { opts = append(opts, kgo.BrokerMaxReadBytes(cfg.BrokerMaxReadBytes)) } client, err := cfg.newClient(cfg.TopicAttributeFunc, opts...) if err != nil { return nil, fmt.Errorf("kafka: failed creating kafka consumer: %w", err) } if cfg.MaxPollRecords <= 0 { cfg.MaxPollRecords = 500 } return &Consumer{ cfg: cfg, client: client, consumer: consumer, closed: make(chan struct{}), running: make(chan struct{}), forceClose: forceClose, stopPoll: func() {}, tracer: cfg.tracerProvider().Tracer("kafka"), }, nil } // Close the consumer, blocking until all partition consumers are stopped. func (c *Consumer) Close() error { c.mu.Lock() defer c.mu.Unlock() select { case <-c.closed: default: close(c.closed) defer c.client.CloseAllowingRebalance() // Last, close the `kgo.Client` // Cancel the context used in client.PollRecords, triggering graceful // cancellation. c.stopPoll() stopped := make(chan struct{}) go func() { defer close(stopped) // Close all partition consumers first to ensure there aren't any // records being processed while the kgo.Client is being closed. // Also ensures that commits can be issued after the records are // processed when AtLeastOnceDelivery is configured. c.consumer.close() }() // Wait for the consumers to process any in-flight records, or cancel // the underlying processing context if they aren't stopped in time. select { case <-time.After(c.cfg.ShutdownGracePeriod): // Timeout c.forceClose(fmt.Errorf( "consumer: close: timeout waiting for consumers to stop (%s)", c.cfg.ShutdownGracePeriod.String(), )) case <-stopped: // Stopped within c.cfg.ShutdownGracePeriod. } } return nil } // Run the consumer until a non recoverable error is found: // - ErrCommitFailed. // // To shut down the consumer, call consumer.Close() or cancel the context. // Calling `consumer.Close` is advisable to ensure graceful shutdown and // avoid any records from being lost (AMOD), or processed twice (ALOD). // To ensure that all polled records are processed. Close() must be called, // even when the context is canceled. // // If called more than once, returns `apmqueue.ErrConsumerAlreadyRunning`. func (c *Consumer) Run(ctx context.Context) error { c.mu.Lock() select { case <-c.running: c.mu.Unlock() return apmqueue.ErrConsumerAlreadyRunning default: close(c.running) } // Create a new context from the passed context, used exclusively for // kgo.Client.* calls. c.stopFetch is called by consumer.Close() to // cancel this context as part of the graceful shutdown sequence. var clientCtx context.Context clientCtx, c.stopPoll = context.WithCancel(ctx) c.mu.Unlock() for { if err := c.fetch(clientCtx); err != nil { if errors.Is(err, context.Canceled) { return nil // Return no error if err == context.Canceled. } return fmt.Errorf("cannot fetch records: %w", err) } } } // fetch polls the Kafka broker for new records up to cfg.MaxPollRecords. // Any errors returned by fetch should be considered fatal. func (c *Consumer) fetch(ctx context.Context) error { fetches := c.client.PollRecords(ctx, c.cfg.MaxPollRecords) defer c.client.AllowRebalance() if fetches.IsClientClosed() || errors.Is(fetches.Err0(), context.Canceled) || errors.Is(fetches.Err0(), context.DeadlineExceeded) { return context.Canceled } c.mu.RLock() defer c.mu.RUnlock() switch c.cfg.Delivery { case apmqueue.AtLeastOnceDeliveryType: // Committing the processed records happens on each partition consumer. case apmqueue.AtMostOnceDeliveryType: // Commit the fetched record offsets as soon as we've polled them. if err := c.client.CommitUncommittedOffsets(ctx); err != nil { logger := c.cfg.Logger logger.Error("consumer commit offsets returned error", zap.Error(err)) // NOTE(marclop): If the commit fails with an unrecoverable error, // return it and terminate the consumer. This will avoid potentially // processing records twice, and it's up to the consumer to re-start // the consumer. return ErrCommitFailed } // Allow re-balancing now that we have committed offsets, preventing // another consumer from reprocessing the records. c.client.AllowRebalance() } fetches.EachError(func(t string, p int32, err error) { topicName := strings.TrimPrefix(t, c.consumer.topicPrefix) logger := c.cfg.Logger if c.cfg.TopicLogFieldFunc != nil { logger = logger.With(c.cfg.TopicLogFieldFunc(topicName)) } logger.Error( "consumer fetches returned error", zap.Error(err), zap.String("topic", topicName), zap.Int32("partition", p), ) }) c.consumer.processFetch(fetches) return nil } // Healthy returns an error if the Kafka client fails to reach a discovered // broker. func (c *Consumer) Healthy(ctx context.Context) error { if err := c.client.Ping(ctx); err != nil { return fmt.Errorf("health probe: %w", err) } return nil } // consumer wraps partitionConsumers and exposes the necessary callbacks // to use when partitions are reassigned. type consumer struct { mu sync.RWMutex topicPrefix string assignments map[topicPartition]*pc processor apmqueue.Processor logger *zap.Logger delivery apmqueue.DeliveryType logFieldFn TopicLogFieldFunc // ctx contains the graceful cancellation context that is passed to the // partition consumers. ctx context.Context } type topicPartition struct { topic string partition int32 } // assigned must be set as a kgo.OnPartitionsAssigned callback. Ensuring all // assigned partitions to this consumer process received records. func (c *consumer) assigned(_ context.Context, client *kgo.Client, assigned map[string][]int32) { c.mu.Lock() defer c.mu.Unlock() for topic, partitions := range assigned { for _, partition := range partitions { t := strings.TrimPrefix(topic, c.topicPrefix) logger := c.logger.With( zap.String("topic", t), zap.Int32("partition", partition), ) if c.logFieldFn != nil { logger = logger.With(c.logFieldFn(t)) } pc := newPartitionConsumer(c.ctx, client, c.processor, c.delivery, t, logger, ) c.assignments[topicPartition{topic: topic, partition: partition}] = pc } } } // lost must be set as a kgo.OnPartitionsLost and kgo.OnPartitionsReassigned // callbacks. Ensures that partitions that are lost (see kgo.OnPartitionsLost // for more details) or reassigned (see kgo.OnPartitionsReassigned for more // details) have their partition consumer stopped. // This callback must finish within the re-balance timeout. func (c *consumer) lost(_ context.Context, _ *kgo.Client, lost map[string][]int32) { c.mu.Lock() defer c.mu.Unlock() var wg sync.WaitGroup for topic, partitions := range lost { for _, partition := range partitions { tp := topicPartition{topic: topic, partition: partition} if consumer, ok := c.assignments[tp]; ok { wg.Add(1) go func() { defer wg.Done() consumer.wait() }() } delete(c.assignments, tp) } } wg.Wait() } // close is used on initiate clean shutdown. This call blocks until all the // partition consumers have processed their records and stopped. // // It holds the write lock, which cannot be acquired until the last fetch of // records has been sent to all the partition consumers. func (c *consumer) close() { c.mu.Lock() defer c.mu.Unlock() var wg sync.WaitGroup for tp, consumer := range c.assignments { delete(c.assignments, tp) wg.Add(1) go func(c *pc) { defer wg.Done() c.wait() }(consumer) } wg.Wait() } // processFetch sends the received records for a partition to the corresponding // partition consumer. If topic/partition combination can't be found in the // consumer map, the consumer has been closed. // // It holds the consumer read lock, which cannot be acquired if the consumer is // closing. func (c *consumer) processFetch(fetches kgo.Fetches) { if fetches.NumRecords() == 0 { return } c.mu.RLock() defer c.mu.RUnlock() fetches.EachPartition(func(ftp kgo.FetchTopicPartition) { if len(ftp.Records) == 0 { return } consumer, ok := c.assignments[topicPartition{topic: ftp.Topic, partition: ftp.Partition}] if ok { consumer.consumeRecords(ftp) return } // NOTE(marclop) While possible, this is unlikely to happen given the // locking that's in place in the caller. if c.delivery == apmqueue.AtMostOnceDeliveryType { topicName := strings.TrimPrefix(ftp.Topic, c.topicPrefix) logger := c.logger if c.logFieldFn != nil { logger = logger.With(c.logFieldFn(topicName)) } logger.Warn( "data loss: failed to send records to process after commit", zap.Error(errors.New( "attempted to process records for revoked partition", )), zap.String("topic", topicName), zap.Int32("partition", ftp.Partition), zap.Int64("offset", ftp.HighWatermark), zap.Int("records", len(ftp.Records)), ) } }) } // OnFetchRecordBuffered Implements the kgo.Hook that injects the processCtx // context that is canceled by `Consumer.Close()`. func (c *consumer) OnFetchRecordBuffered(r *kgo.Record) { if r.Context == nil { r.Context = c.ctx } } type pc struct { topic apmqueue.Topic g errgroup.Group logger *zap.Logger delivery apmqueue.DeliveryType processor apmqueue.Processor client *kgo.Client ctx context.Context } func newPartitionConsumer(ctx context.Context, client *kgo.Client, processor apmqueue.Processor, delivery apmqueue.DeliveryType, topic string, logger *zap.Logger, ) *pc { c := pc{ topic: apmqueue.Topic(topic), ctx: ctx, client: client, processor: processor, delivery: delivery, logger: logger, } // Only allow calls to processor.Process to happen serially. c.g.SetLimit(1) return &c } // consumeTopicPartition processes the records for a topic and partition. The // records will be processed asynchronously. func (c *pc) consumeRecords(ftp kgo.FetchTopicPartition) { c.g.Go(func() error { // Stores the last processed record. Default to -1 for cases where // only the first record is received. last := -1 for i, msg := range ftp.Records { meta := make(map[string]string, len(msg.Headers)) for _, h := range msg.Headers { meta[h.Key] = string(h.Value) } processCtx := queuecontext.WithMetadata(msg.Context, meta) record := apmqueue.Record{ Topic: c.topic, Partition: msg.Partition, OrderingKey: msg.Key, Value: msg.Value, } // If a record can't be processed, no retries are attempted and it // may be lost. https://github.com/elastic/apm-queue/issues/118. if err := c.processor.Process(processCtx, record); err != nil { c.logger.Error("data loss: unable to process event", zap.Error(err), zap.Int64("offset", msg.Offset), zap.Any("headers", meta), ) switch c.delivery { case apmqueue.AtLeastOnceDeliveryType: continue } } last = i } // Commit the last record offset when one or more records are processed // and the delivery guarantee is set to AtLeastOnceDeliveryType. if c.delivery == apmqueue.AtLeastOnceDeliveryType && last >= 0 { lastRecord := ftp.Records[last] if err := c.client.CommitRecords(c.ctx, lastRecord); err != nil { c.logger.Error("unable to commit records", zap.Error(err), zap.Int64("offset", lastRecord.Offset), ) } else if len(ftp.Records) > 0 { c.logger.Info("committed", zap.Int64("offset", lastRecord.Offset), ) } } return nil }) } // wait blocks until all the records have been processed. func (c *pc) wait() error { return c.g.Wait() }