kafka/manager.go (284 lines of code) (raw):
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package kafka
import (
"context"
"errors"
"fmt"
"regexp"
"strings"
"github.com/twmb/franz-go/pkg/kadm"
"github.com/twmb/franz-go/pkg/kerr"
"github.com/twmb/franz-go/pkg/kgo"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/codes"
"go.opentelemetry.io/otel/metric"
semconv "go.opentelemetry.io/otel/semconv/v1.4.0"
"go.opentelemetry.io/otel/trace"
"go.uber.org/zap"
apmqueue "github.com/elastic/apm-queue/v2"
)
// ManagerConfig holds configuration for managing Kafka topics.
type ManagerConfig struct {
CommonConfig
}
// finalize ensures the configuration is valid, setting default values from
// environment variables as described in doc comments, returning an error if
// any configuration is invalid.
func (cfg *ManagerConfig) finalize() error {
var errs []error
if err := cfg.CommonConfig.finalize(); err != nil {
errs = append(errs, err)
}
return errors.Join(errs...)
}
// Manager manages Kafka topics.
type Manager struct {
cfg ManagerConfig
client *kgo.Client
adminClient *kadm.Client
tracer trace.Tracer
deleted metric.Int64Counter
}
// NewManager returns a new Manager with the given config.
func NewManager(cfg ManagerConfig) (*Manager, error) {
if err := cfg.finalize(); err != nil {
return nil, fmt.Errorf("kafka: invalid manager config: %w", err)
}
client, err := cfg.newClient(nil)
if err != nil {
return nil, fmt.Errorf("kafka: failed creating kafka client: %w", err)
}
if cfg.MeterProvider == nil {
cfg.MeterProvider = otel.GetMeterProvider()
}
meter := cfg.MeterProvider.Meter("github.com/elastic/apm-queue/kafka")
deleted, err := meter.Int64Counter("topics.deleted.count",
metric.WithDescription("The number of deleted topics"),
)
if err != nil {
return nil, fmt.Errorf("failed creating 'topics.deleted.count' metric: %w", err)
}
return &Manager{
cfg: cfg,
client: client,
adminClient: kadm.NewClient(client),
tracer: cfg.tracerProvider().Tracer("kafka"),
deleted: deleted,
}, nil
}
// Close closes the manager's resources, including its connections to the
// Kafka brokers and any associated goroutines.
func (m *Manager) Close() error {
m.client.Close()
return nil
}
// DeleteTopics deletes one or more topics.
//
// No error is returned for topics that do not exist.
func (m *Manager) DeleteTopics(ctx context.Context, topics ...apmqueue.Topic) error {
// TODO(axw) how should we record topics?
ctx, span := m.tracer.Start(ctx, "DeleteTopics", trace.WithAttributes(
semconv.MessagingSystemKey.String("kafka"),
))
defer span.End()
namespacePrefix := m.cfg.namespacePrefix()
topicNames := make([]string, len(topics))
for i, topic := range topics {
topicNames[i] = fmt.Sprintf("%s%s", namespacePrefix, topic)
}
responses, err := m.adminClient.DeleteTopics(ctx, topicNames...)
if err != nil {
span.RecordError(err)
span.SetStatus(codes.Error, "DeleteTopics returned an error")
return fmt.Errorf("failed to delete kafka topics: %w", err)
}
var deleteErrors []error
for _, response := range responses.Sorted() {
topic := strings.TrimPrefix(response.Topic, namespacePrefix)
logger := m.cfg.Logger.With(zap.String("topic", topic))
if m.cfg.TopicLogFieldFunc != nil {
logger = logger.With(m.cfg.TopicLogFieldFunc(topic))
}
if err := response.Err; err != nil {
if errors.Is(err, kerr.UnknownTopicOrPartition) {
logger.Debug("kafka topic does not exist")
} else {
span.RecordError(err)
span.SetStatus(codes.Error, "failed to delete one or more topic")
deleteErrors = append(deleteErrors,
fmt.Errorf("failed to delete topic %q: %w", topic, err),
)
attrs := []attribute.KeyValue{
semconv.MessagingSystemKey.String("kafka"),
attribute.String("outcome", "failure"),
attribute.String("topic", topic),
}
if kv := m.cfg.TopicAttributeFunc(topic); kv != (attribute.KeyValue{}) {
attrs = append(attrs, kv)
}
m.deleted.Add(context.Background(), 1, metric.WithAttributeSet(
attribute.NewSet(attrs...),
))
}
continue
}
attrs := []attribute.KeyValue{
semconv.MessagingSystemKey.String("kafka"),
attribute.String("outcome", "success"),
attribute.String("topic", topic),
}
if kv := m.cfg.TopicAttributeFunc(topic); kv != (attribute.KeyValue{}) {
attrs = append(attrs, kv)
}
m.deleted.Add(context.Background(), 1, metric.WithAttributeSet(
attribute.NewSet(attrs...),
))
logger.Info("deleted kafka topic")
}
return errors.Join(deleteErrors...)
}
// Healthy returns an error if the Kafka client fails to reach a discovered broker.
func (m *Manager) Healthy(ctx context.Context) error {
if err := m.client.Ping(ctx); err != nil {
return fmt.Errorf("failed to ping kafka brokers: %w", err)
}
return nil
}
type memberTopic struct {
clientID string
topic string
}
type regexConsumer struct {
regex *regexp.Regexp
consumer string
}
// MonitorConsumerLag registers a callback with OpenTelemetry
// to measure consumer group lag for the given topics.
func (m *Manager) MonitorConsumerLag(topicConsumers []apmqueue.TopicConsumer) (metric.Registration, error) {
monitorTopicConsumers := make(map[apmqueue.TopicConsumer]struct{}, len(topicConsumers))
var regex []regexConsumer
for _, tc := range topicConsumers {
monitorTopicConsumers[tc] = struct{}{}
if tc.Regex != "" {
re, err := regexp.Compile(tc.Regex)
if err != nil {
return nil, fmt.Errorf("failed to compile regex %s: %w",
tc.Regex, err,
)
}
regex = append(regex, regexConsumer{
regex: re,
consumer: tc.Consumer,
})
}
}
mp := m.cfg.meterProvider()
meter := mp.Meter("github.com/elastic/apm-queue/kafka")
consumerGroupLagMetric, err := meter.Int64ObservableGauge("consumer_group_lag")
if err != nil {
return nil, fmt.Errorf("kafka: failed to create consumer_group_lag metric: %w", err)
}
assignmentMetric, err := meter.Int64ObservableGauge("consumer_group_assignment")
if err != nil {
return nil, fmt.Errorf("kafka: failed to create consumer_group.assignment metric: %w", err)
}
namespacePrefix := m.cfg.namespacePrefix()
gatherMetrics := func(ctx context.Context, o metric.Observer) error {
ctx, span := m.tracer.Start(ctx, "GatherMetrics")
defer span.End()
consumerSet := make(map[string]struct{})
for _, tc := range topicConsumers {
consumerSet[tc.Consumer] = struct{}{}
}
consumers := make([]string, 0, len(consumerSet))
for consumer := range consumerSet {
consumers = append(consumers, consumer)
}
m.cfg.Logger.Debug("reporting consumer lag", zap.Strings("consumers", consumers))
lag, err := m.adminClient.Lag(ctx, consumers...)
if err != nil {
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
return fmt.Errorf("failed to calculate consumer lag: %w", err)
}
lag.Each(func(l kadm.DescribedGroupLag) {
if err := l.Error(); err != nil {
m.cfg.Logger.Warn("error calculating consumer lag",
zap.String("group", l.Group),
zap.Error(err),
)
return
}
// Map Consumer group member assignments.
memberAssignments := make(map[memberTopic]int64)
for topic, partitions := range l.Lag {
if !strings.HasPrefix(topic, namespacePrefix) {
// Ignore topics outside the namespace.
continue
}
topic = topic[len(namespacePrefix):]
logger := m.cfg.Logger
if m.cfg.TopicLogFieldFunc != nil {
logger = logger.With(m.cfg.TopicLogFieldFunc(topic))
}
var matchesRegex bool
for _, re := range regex {
if l.Group == re.consumer && re.regex.MatchString(topic) {
matchesRegex = true
break
}
}
if _, ok := monitorTopicConsumers[apmqueue.TopicConsumer{
Topic: apmqueue.Topic(topic),
Consumer: l.Group,
}]; !ok && !matchesRegex {
// Skip when no topic matches explicit name or regex.
continue
}
for partition, lag := range partitions {
if lag.Err != nil {
logger.Warn("error getting consumer group lag",
zap.String("group", l.Group),
zap.String("topic", topic),
zap.Int32("partition", partition),
zap.Error(lag.Err),
)
continue
}
clientID := "nil"
// If group is in state Empty the lag.Member is nil
if lag.Member != nil {
clientID = lag.Member.ClientID
}
key := memberTopic{topic: topic, clientID: clientID}
count := memberAssignments[key]
count++
memberAssignments[key] = count
attrs := []attribute.KeyValue{
attribute.String("group", l.Group),
attribute.String("topic", topic),
attribute.Int("partition", int(partition)),
}
if kv := m.cfg.TopicAttributeFunc(topic); kv != (attribute.KeyValue{}) {
attrs = append(attrs, kv)
}
o.ObserveInt64(
consumerGroupLagMetric, lag.Lag,
metric.WithAttributeSet(attribute.NewSet(attrs...)),
)
}
}
for key, count := range memberAssignments {
attrs := []attribute.KeyValue{
attribute.String("group", l.Group),
attribute.String("topic", key.topic),
attribute.String("client_id", key.clientID),
}
if kv := m.cfg.TopicAttributeFunc(key.topic); kv != (attribute.KeyValue{}) {
attrs = append(attrs, kv)
}
o.ObserveInt64(assignmentMetric, count, metric.WithAttributeSet(
attribute.NewSet(attrs...),
))
}
})
return nil
}
return meter.RegisterCallback(gatherMetrics, consumerGroupLagMetric,
assignmentMetric,
)
}
// CreateACLs creates the specified ACLs in the Kafka cluster.
func (m *Manager) CreateACLs(ctx context.Context, acls *kadm.ACLBuilder) error {
res, err := m.adminClient.CreateACLs(ctx, acls)
if err != nil {
return fmt.Errorf("failed to create ACLs: %w", err)
}
var errs []error
for _, r := range res {
if r.Err != nil {
errs = append(errs, r.Err)
}
}
return errors.Join(errs...)
}