func()

in kafka/topiccreator.go [110:288]


func (c *TopicCreator) CreateTopics(ctx context.Context, topics ...apmqueue.Topic) error {
	// TODO(axw) how should we record topics?
	ctx, span := c.m.tracer.Start(ctx, "CreateTopics", trace.WithAttributes(
		semconv.MessagingSystemKey.String("kafka"),
	))
	defer span.End()

	namespacePrefix := c.m.cfg.namespacePrefix()
	topicNames := make([]string, len(topics))
	for i, topic := range topics {
		topicNames[i] = fmt.Sprintf("%s%s", namespacePrefix, topic)
	}

	existing, err := c.m.adminClient.ListTopics(ctx)
	if err != nil {
		span.RecordError(err)
		span.SetStatus(codes.Error, err.Error())
		return fmt.Errorf("failed to list kafka topics: %w", err)
	}

	// missingTopics contains topics which need to be created.
	missingTopics := make([]string, 0, len(topicNames))
	// updatePartitions contains topics which partitions' need to be updated.
	updatePartitions := make([]string, 0, len(topicNames))
	// existingTopics contains the existing topics, used by AlterTopicConfigs.
	existingTopics := make([]string, 0, len(topicNames))
	for _, wantTopic := range topicNames {
		if !existing.Has(wantTopic) {
			missingTopics = append(missingTopics, wantTopic)
			continue
		}
		existingTopics = append(existingTopics, wantTopic)
		if len(existing[wantTopic].Partitions) < c.partitionCount {
			updatePartitions = append(updatePartitions, wantTopic)
		}
	}

	responses, err := c.m.adminClient.CreateTopics(ctx,
		int32(c.partitionCount),
		-1, // default.replication.factor
		c.topicConfigs,
		missingTopics...,
	)
	if err != nil {
		span.RecordError(err)
		span.SetStatus(codes.Error, err.Error())
		return fmt.Errorf("failed to create kafka topics: %w", err)
	}
	loggerFields := []zap.Field{
		zap.Int("partition_count", c.partitionCount),
	}
	if len(c.origTopicConfigs) > 0 {
		loggerFields = append(loggerFields,
			zap.Reflect("topic_configs", c.origTopicConfigs),
		)
	}

	var updateErrors []error
	for _, response := range responses.Sorted() {
		topicName := strings.TrimPrefix(response.Topic, namespacePrefix)
		logger := c.m.cfg.Logger.With(loggerFields...)
		if c.m.cfg.TopicLogFieldFunc != nil {
			logger = logger.With(c.m.cfg.TopicLogFieldFunc(topicName))
		}
		if err := response.Err; err != nil {
			if errors.Is(err, kerr.TopicAlreadyExists) {
				// NOTE(axw) apmotel currently does nothing with span events,
				// hence we log as well as create a span event.
				logger.Debug("kafka topic already exists",
					zap.String("topic", topicName),
				)
				span.AddEvent("kafka topic already exists", trace.WithAttributes(
					semconv.MessagingDestinationKey.String(topicName),
				))
			} else {
				span.RecordError(err)
				span.SetStatus(codes.Error, err.Error())
				updateErrors = append(updateErrors, fmt.Errorf(
					"failed to create topic %q: %w", topicName, err,
				))
				c.created.Add(context.Background(), 1, metric.WithAttributeSet(
					attribute.NewSet(
						semconv.MessagingSystemKey.String("kafka"),
						attribute.String("outcome", "failure"),
						attribute.String("topic", topicName),
					),
				))
			}
			continue
		}
		c.created.Add(context.Background(), 1, metric.WithAttributeSet(
			attribute.NewSet(
				semconv.MessagingSystemKey.String("kafka"),
				attribute.String("outcome", "success"),
				attribute.String("topic", topicName),
			),
		))
		logger.Info("created kafka topic", zap.String("topic", topicName))
	}

	// Update the topic partitions.
	if len(updatePartitions) > 0 {
		updateResp, err := c.m.adminClient.UpdatePartitions(ctx,
			c.partitionCount,
			updatePartitions...,
		)
		if err != nil {
			span.RecordError(err)
			span.SetStatus(codes.Error, err.Error())
			return fmt.Errorf("failed to update partitions for kafka topics: %v: %w",
				updatePartitions, err,
			)
		}
		for _, response := range updateResp.Sorted() {
			topicName := strings.TrimPrefix(response.Topic, namespacePrefix)
			logger := c.m.cfg.Logger.With(loggerFields...)
			if c.m.cfg.TopicLogFieldFunc != nil {
				logger = logger.With(c.m.cfg.TopicLogFieldFunc(topicName))
			}

			if errors.Is(response.Err, kerr.InvalidRequest) {
				// If UpdatePartitions partition count isn't greater than the
				// current number of partitions, each individual response
				// returns `INVALID_REQUEST`.
				continue
			}
			if err := response.Err; err != nil {
				span.RecordError(err)
				span.SetStatus(codes.Error, err.Error())
				updateErrors = append(updateErrors, fmt.Errorf(
					"failed to update partitions for topic %q: %w",
					topicName, err,
				))
				continue
			}
			logger.Info("updated partitions for kafka topic",
				zap.String("topic", topicName),
			)
		}
	}
	if len(existingTopics) > 0 && len(c.topicConfigs) > 0 {
		alterCfg := make([]kadm.AlterConfig, 0, len(c.topicConfigs))
		for k, v := range c.topicConfigs {
			alterCfg = append(alterCfg, kadm.AlterConfig{Name: k, Value: v})
		}
		alterResp, err := c.m.adminClient.AlterTopicConfigs(ctx,
			alterCfg, existingTopics...,
		)
		if err != nil {
			span.RecordError(err)
			span.SetStatus(codes.Error, err.Error())
			return fmt.Errorf(
				"failed to update configuration for kafka topics: %v:%w",
				existingTopics, err,
			)
		}
		for _, response := range alterResp {
			topicName := strings.TrimPrefix(response.Name, namespacePrefix)
			logger := c.m.cfg.Logger.With(loggerFields...)
			if c.m.cfg.TopicLogFieldFunc != nil {
				logger = logger.With(c.m.cfg.TopicLogFieldFunc(topicName))
			}

			if err := response.Err; err != nil {
				span.RecordError(err)
				span.SetStatus(codes.Error, err.Error())
				updateErrors = append(updateErrors, fmt.Errorf(
					"failed to alter configuration for topic %q: %w",
					topicName, err,
				))
				continue
			}
			logger.Info("altered configuration for kafka topic",
				zap.String("topic", topicName),
			)
		}
	}
	return errors.Join(updateErrors...)
}