kafka/topiccreator.go (226 lines of code) (raw):

// Licensed to Elasticsearch B.V. under one or more contributor // license agreements. See the NOTICE file distributed with // this work for additional information regarding copyright // ownership. Elasticsearch B.V. licenses this file to you under // the Apache License, Version 2.0 (the "License"); you may // not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, // software distributed under the License is distributed on an // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. package kafka import ( "context" "errors" "fmt" "strings" "github.com/twmb/franz-go/pkg/kadm" "github.com/twmb/franz-go/pkg/kerr" "go.opentelemetry.io/otel" "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/codes" "go.opentelemetry.io/otel/metric" semconv "go.opentelemetry.io/otel/semconv/v1.4.0" "go.opentelemetry.io/otel/trace" "go.uber.org/zap" apmqueue "github.com/elastic/apm-queue/v2" ) // TopicCreatorConfig holds configuration for creating Kafka topics. type TopicCreatorConfig struct { // PartitionCount is the number of partitions to assign to // newly created topics. // // Must be non-zero. If PartitonCount is -1, the broker's // default value (requires Kafka 2.4+). PartitionCount int // TopicConfigs holds any topic configs to assign to newly // created topics, such as `retention.ms`. // // See https://kafka.apache.org/documentation/#topicconfigs TopicConfigs map[string]string // MeterProvider used to create meters and record metrics (Optional). MeterProvider metric.MeterProvider } // Validate ensures the configuration is valid, returning an error otherwise. func (cfg TopicCreatorConfig) Validate() error { var errs []error if cfg.PartitionCount == 0 { errs = append(errs, errors.New("kafka: partition count must be non-zero")) } return errors.Join(errs...) } // TopicCreator creates Kafka topics. type TopicCreator struct { m *Manager partitionCount int origTopicConfigs map[string]string topicConfigs map[string]*string created metric.Int64Counter } // NewTopicCreator returns a new TopicCreator with the given config. func (m *Manager) NewTopicCreator(cfg TopicCreatorConfig) (*TopicCreator, error) { if err := cfg.Validate(); err != nil { return nil, fmt.Errorf("kafka: invalid topic creator config: %w", err) } if cfg.MeterProvider == nil { cfg.MeterProvider = otel.GetMeterProvider() } meter := cfg.MeterProvider.Meter("github.com/elastic/apm-queue/kafka") created, err := meter.Int64Counter("topics.created.count", metric.WithDescription("The number of created topics"), ) if err != nil { return nil, fmt.Errorf("failed creating 'topics.created.count' metric: %w", err) } var topicConfigs map[string]*string if len(cfg.TopicConfigs) != 0 { topicConfigs = make(map[string]*string, len(cfg.TopicConfigs)) for k, v := range cfg.TopicConfigs { topicConfigs[k] = kadm.StringPtr(v) } } return &TopicCreator{ m: m, partitionCount: cfg.PartitionCount, origTopicConfigs: cfg.TopicConfigs, topicConfigs: topicConfigs, created: created, }, nil } // CreateTopics creates one or more topics. // // Topics that already exist will be updated. func (c *TopicCreator) CreateTopics(ctx context.Context, topics ...apmqueue.Topic) error { // TODO(axw) how should we record topics? ctx, span := c.m.tracer.Start(ctx, "CreateTopics", trace.WithAttributes( semconv.MessagingSystemKey.String("kafka"), )) defer span.End() namespacePrefix := c.m.cfg.namespacePrefix() topicNames := make([]string, len(topics)) for i, topic := range topics { topicNames[i] = fmt.Sprintf("%s%s", namespacePrefix, topic) } existing, err := c.m.adminClient.ListTopics(ctx) if err != nil { span.RecordError(err) span.SetStatus(codes.Error, err.Error()) return fmt.Errorf("failed to list kafka topics: %w", err) } // missingTopics contains topics which need to be created. missingTopics := make([]string, 0, len(topicNames)) // updatePartitions contains topics which partitions' need to be updated. updatePartitions := make([]string, 0, len(topicNames)) // existingTopics contains the existing topics, used by AlterTopicConfigs. existingTopics := make([]string, 0, len(topicNames)) for _, wantTopic := range topicNames { if !existing.Has(wantTopic) { missingTopics = append(missingTopics, wantTopic) continue } existingTopics = append(existingTopics, wantTopic) if len(existing[wantTopic].Partitions) < c.partitionCount { updatePartitions = append(updatePartitions, wantTopic) } } responses, err := c.m.adminClient.CreateTopics(ctx, int32(c.partitionCount), -1, // default.replication.factor c.topicConfigs, missingTopics..., ) if err != nil { span.RecordError(err) span.SetStatus(codes.Error, err.Error()) return fmt.Errorf("failed to create kafka topics: %w", err) } loggerFields := []zap.Field{ zap.Int("partition_count", c.partitionCount), } if len(c.origTopicConfigs) > 0 { loggerFields = append(loggerFields, zap.Reflect("topic_configs", c.origTopicConfigs), ) } var updateErrors []error for _, response := range responses.Sorted() { topicName := strings.TrimPrefix(response.Topic, namespacePrefix) logger := c.m.cfg.Logger.With(loggerFields...) if c.m.cfg.TopicLogFieldFunc != nil { logger = logger.With(c.m.cfg.TopicLogFieldFunc(topicName)) } if err := response.Err; err != nil { if errors.Is(err, kerr.TopicAlreadyExists) { // NOTE(axw) apmotel currently does nothing with span events, // hence we log as well as create a span event. logger.Debug("kafka topic already exists", zap.String("topic", topicName), ) span.AddEvent("kafka topic already exists", trace.WithAttributes( semconv.MessagingDestinationKey.String(topicName), )) } else { span.RecordError(err) span.SetStatus(codes.Error, err.Error()) updateErrors = append(updateErrors, fmt.Errorf( "failed to create topic %q: %w", topicName, err, )) c.created.Add(context.Background(), 1, metric.WithAttributeSet( attribute.NewSet( semconv.MessagingSystemKey.String("kafka"), attribute.String("outcome", "failure"), attribute.String("topic", topicName), ), )) } continue } c.created.Add(context.Background(), 1, metric.WithAttributeSet( attribute.NewSet( semconv.MessagingSystemKey.String("kafka"), attribute.String("outcome", "success"), attribute.String("topic", topicName), ), )) logger.Info("created kafka topic", zap.String("topic", topicName)) } // Update the topic partitions. if len(updatePartitions) > 0 { updateResp, err := c.m.adminClient.UpdatePartitions(ctx, c.partitionCount, updatePartitions..., ) if err != nil { span.RecordError(err) span.SetStatus(codes.Error, err.Error()) return fmt.Errorf("failed to update partitions for kafka topics: %v: %w", updatePartitions, err, ) } for _, response := range updateResp.Sorted() { topicName := strings.TrimPrefix(response.Topic, namespacePrefix) logger := c.m.cfg.Logger.With(loggerFields...) if c.m.cfg.TopicLogFieldFunc != nil { logger = logger.With(c.m.cfg.TopicLogFieldFunc(topicName)) } if errors.Is(response.Err, kerr.InvalidRequest) { // If UpdatePartitions partition count isn't greater than the // current number of partitions, each individual response // returns `INVALID_REQUEST`. continue } if err := response.Err; err != nil { span.RecordError(err) span.SetStatus(codes.Error, err.Error()) updateErrors = append(updateErrors, fmt.Errorf( "failed to update partitions for topic %q: %w", topicName, err, )) continue } logger.Info("updated partitions for kafka topic", zap.String("topic", topicName), ) } } if len(existingTopics) > 0 && len(c.topicConfigs) > 0 { alterCfg := make([]kadm.AlterConfig, 0, len(c.topicConfigs)) for k, v := range c.topicConfigs { alterCfg = append(alterCfg, kadm.AlterConfig{Name: k, Value: v}) } alterResp, err := c.m.adminClient.AlterTopicConfigs(ctx, alterCfg, existingTopics..., ) if err != nil { span.RecordError(err) span.SetStatus(codes.Error, err.Error()) return fmt.Errorf( "failed to update configuration for kafka topics: %v:%w", existingTopics, err, ) } for _, response := range alterResp { topicName := strings.TrimPrefix(response.Name, namespacePrefix) logger := c.m.cfg.Logger.With(loggerFields...) if c.m.cfg.TopicLogFieldFunc != nil { logger = logger.With(c.m.cfg.TopicLogFieldFunc(topicName)) } if err := response.Err; err != nil { span.RecordError(err) span.SetStatus(codes.Error, err.Error()) updateErrors = append(updateErrors, fmt.Errorf( "failed to alter configuration for topic %q: %w", topicName, err, )) continue } logger.Info("altered configuration for kafka topic", zap.String("topic", topicName), ) } } return errors.Join(updateErrors...) }