kafka/producer.go (183 lines of code) (raw):
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package kafka
import (
"context"
"errors"
"fmt"
"os"
"strings"
"sync"
"go.uber.org/zap"
"github.com/twmb/franz-go/pkg/kgo"
apmqueue "github.com/elastic/apm-queue/v2"
"github.com/elastic/apm-queue/v2/queuecontext"
)
// CompressionCodec configures how records are compressed before being sent.
// Type alias to kgo.CompressionCodec.
type CompressionCodec = kgo.CompressionCodec
// NoCompression is a compression option that avoids compression. This can
// always be used as a fallback compression.
func NoCompression() CompressionCodec { return kgo.NoCompression() }
// GzipCompression enables gzip compression with the default compression level.
func GzipCompression() CompressionCodec { return kgo.GzipCompression() }
// SnappyCompression enables snappy compression.
func SnappyCompression() CompressionCodec { return kgo.SnappyCompression() }
// Lz4Compression enables lz4 compression with the fastest compression level.
func Lz4Compression() CompressionCodec { return kgo.Lz4Compression() }
// ZstdCompression enables zstd compression with the default compression level.
func ZstdCompression() CompressionCodec { return kgo.ZstdCompression() }
// ProducerConfig holds configuration for publishing events to Kafka.
type ProducerConfig struct {
CommonConfig
// MaxBufferedRecords sets the max amount of records the client will buffer
MaxBufferedRecords int
// ProducerBatchMaxBytes upper bounds the size of a record batch
ProducerBatchMaxBytes int32
// ManualFlushing disables auto-flushing when producing.
ManualFlushing bool
// Sync can be used to indicate whether production should be synchronous.
Sync bool
// CompressionCodec specifies a list of compression codecs.
// See kgo.ProducerBatchCompression for more details.
//
// If CompressionCodec is empty, then the default will be set
// based on $KAFKA_PRODUCER_COMPRESSION_CODEC, which should be
// a comma-separated list of codec preferences from the list:
//
// [none, gzip, snappy, lz4, zstd]
//
// If $KAFKA_PRODUCER_COMPRESSION_CODEC is not specified, then
// the default behaviour of franz-go is to use [snappy, none].
CompressionCodec []CompressionCodec
// ProduceCallback is a hook called after the record has been produced
ProduceCallback func(*kgo.Record, error)
// BatchListener is called per topic/partition after a batch is
// successfully produced to a Kafka broker.
BatchListener BatchWriteListener
// RecordPartitioner is a function that returns the partition to which
// a record should be sent. If nil, the default partitioner is used.
RecordPartitioner kgo.Partitioner
// AllowAutoTopicCreation enables topics to be auto created if they do
// not exist when fetching their metadata.
AllowAutoTopicCreation bool
}
// BatchWriteListener specifies a callback function that is invoked after a batch is
// successfully produced to a Kafka broker. It is invoked with the corresponding topic and the
// amount of bytes written to that topic (taking compression into account, when applicable).
type BatchWriteListener func(topic string, bytesWritten int)
// OnProduceBatchWritten implements the kgo.HookProduceBatchWritten interface.
func (l BatchWriteListener) OnProduceBatchWritten(_ kgo.BrokerMetadata,
topic string, _ int32, m kgo.ProduceBatchMetrics) {
l(topic, m.CompressedBytes)
}
var _ kgo.HookProduceBatchWritten = (BatchWriteListener)(nil)
// finalize ensures the configuration is valid, setting default values from
// environment variables as described in doc comments, returning an error if
// any configuration is invalid.
func (cfg *ProducerConfig) finalize() error {
var errs []error
if err := cfg.CommonConfig.finalize(); err != nil {
errs = append(errs, err)
}
if cfg.MaxBufferedRecords < 0 {
errs = append(errs, fmt.Errorf("kafka: max buffered records cannot be negative: %d", cfg.MaxBufferedRecords))
}
if cfg.ProducerBatchMaxBytes < 0 {
errs = append(errs, fmt.Errorf("kafka: producer batch max bytes cannot be negative: %d", cfg.ProducerBatchMaxBytes))
}
if len(cfg.CompressionCodec) == 0 {
if v := os.Getenv("KAFKA_PRODUCER_COMPRESSION_CODEC"); v != "" {
names := strings.Split(v, ",")
codecs := make([]CompressionCodec, 0, len(names))
for _, name := range names {
switch name {
case "none":
codecs = append(codecs, NoCompression())
case "gzip":
codecs = append(codecs, GzipCompression())
case "snappy":
codecs = append(codecs, SnappyCompression())
case "lz4":
codecs = append(codecs, Lz4Compression())
case "zstd":
codecs = append(codecs, ZstdCompression())
default:
errs = append(errs, fmt.Errorf("kafka: unknown codec %q", name))
}
}
cfg.CompressionCodec = codecs
}
}
return errors.Join(errs...)
}
var _ apmqueue.Producer = &Producer{}
// Producer publishes events to Kafka. Implements the Producer interface.
type Producer struct {
cfg ProducerConfig
client *kgo.Client
mu sync.RWMutex
}
// NewProducer returns a new Producer with the given config.
func NewProducer(cfg ProducerConfig) (*Producer, error) {
if err := cfg.finalize(); err != nil {
return nil, fmt.Errorf("kafka: invalid producer config: %w", err)
}
var opts []kgo.Opt
if len(cfg.CompressionCodec) > 0 {
opts = append(opts, kgo.ProducerBatchCompression(cfg.CompressionCodec...))
}
if cfg.MaxBufferedRecords != 0 {
opts = append(opts, kgo.MaxBufferedRecords(cfg.MaxBufferedRecords))
}
if cfg.ProducerBatchMaxBytes != 0 {
opts = append(opts, kgo.ProducerBatchMaxBytes(cfg.ProducerBatchMaxBytes))
}
if cfg.ManualFlushing {
opts = append(opts, kgo.ManualFlushing())
}
if cfg.BatchListener != nil {
opts = append(opts, kgo.WithHooks(cfg.BatchListener))
}
if cfg.RecordPartitioner != nil {
opts = append(opts, kgo.RecordPartitioner(cfg.RecordPartitioner))
}
if cfg.AllowAutoTopicCreation {
opts = append(opts, kgo.AllowAutoTopicCreation())
}
client, err := cfg.newClient(cfg.TopicAttributeFunc, opts...)
if err != nil {
return nil, fmt.Errorf("kafka: failed creating producer: %w", err)
}
return &Producer{
cfg: cfg,
client: client,
}, nil
}
// Close stops the producer
//
// This call is blocking and will cause all the underlying clients to stop
// producing. If producing is asynchronous, it'll block until all messages
// have been produced. After Close() is called, Producer cannot be reused.
func (p *Producer) Close() error {
p.mu.Lock()
defer p.mu.Unlock()
if err := p.client.Flush(context.Background()); err != nil {
return fmt.Errorf("cannot flush on close: %w", err)
}
p.client.Close()
return nil
}
// Produce produces N records. If the Producer is synchronous, waits until
// all records are produced, otherwise, returns as soon as the records are
// stored in the producer buffer, or when the records are produced to the
// queue if sync producing is configured.
// If the context has been enriched with metadata, each entry will be added
// as a record's header.
// Produce takes ownership of Record and any modifications after Produce is
// called may cause an unhandled exception.
func (p *Producer) Produce(ctx context.Context, rs ...apmqueue.Record) error {
if len(rs) == 0 {
return nil
}
// Take a read lock to prevent Close from closing the client
// while we're attempting to produce records.
p.mu.RLock()
defer p.mu.RUnlock()
var headers []kgo.RecordHeader
if m, ok := queuecontext.MetadataFromContext(ctx); ok {
headers = make([]kgo.RecordHeader, 0, len(m))
for k, v := range m {
headers = append(headers, kgo.RecordHeader{
Key: k, Value: []byte(v),
})
}
}
var wg sync.WaitGroup
wg.Add(len(rs))
if !p.cfg.Sync {
ctx = queuecontext.DetachedContext(ctx)
}
namespacePrefix := p.cfg.namespacePrefix()
for _, record := range rs {
kgoRecord := &kgo.Record{
Headers: headers,
Topic: fmt.Sprintf("%s%s", namespacePrefix, record.Topic),
Key: record.OrderingKey,
Value: record.Value,
}
p.client.Produce(ctx, kgoRecord, func(r *kgo.Record, err error) {
defer wg.Done()
// kotel already marks spans as errors. No need to handle it here.
if err != nil {
topicName := strings.TrimPrefix(r.Topic, namespacePrefix)
logger := p.cfg.Logger
if p.cfg.TopicLogFieldFunc != nil {
logger = logger.With(p.cfg.TopicLogFieldFunc(topicName))
}
logger.Error("failed producing message",
zap.Error(err),
zap.String("topic", topicName),
zap.Int64("offset", r.Offset),
zap.Int32("partition", r.Partition),
zap.Any("headers", headers),
)
}
if p.cfg.ProduceCallback != nil {
p.cfg.ProduceCallback(r, err)
}
})
}
if p.cfg.Sync {
wg.Wait()
}
return nil
}
// Healthy returns an error if the Kafka client fails to reach a discovered
// broker.
func (p *Producer) Healthy(ctx context.Context) error {
if err := p.client.Ping(ctx); err != nil {
return fmt.Errorf("health probe: %w", err)
}
return nil
}