in kafka/topiccreator.go [110:288]
func (c *TopicCreator) CreateTopics(ctx context.Context, topics ...apmqueue.Topic) error {
// TODO(axw) how should we record topics?
ctx, span := c.m.tracer.Start(ctx, "CreateTopics", trace.WithAttributes(
semconv.MessagingSystemKey.String("kafka"),
))
defer span.End()
namespacePrefix := c.m.cfg.namespacePrefix()
topicNames := make([]string, len(topics))
for i, topic := range topics {
topicNames[i] = fmt.Sprintf("%s%s", namespacePrefix, topic)
}
existing, err := c.m.adminClient.ListTopics(ctx)
if err != nil {
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
return fmt.Errorf("failed to list kafka topics: %w", err)
}
// missingTopics contains topics which need to be created.
missingTopics := make([]string, 0, len(topicNames))
// updatePartitions contains topics which partitions' need to be updated.
updatePartitions := make([]string, 0, len(topicNames))
// existingTopics contains the existing topics, used by AlterTopicConfigs.
existingTopics := make([]string, 0, len(topicNames))
for _, wantTopic := range topicNames {
if !existing.Has(wantTopic) {
missingTopics = append(missingTopics, wantTopic)
continue
}
existingTopics = append(existingTopics, wantTopic)
if len(existing[wantTopic].Partitions) < c.partitionCount {
updatePartitions = append(updatePartitions, wantTopic)
}
}
responses, err := c.m.adminClient.CreateTopics(ctx,
int32(c.partitionCount),
-1, // default.replication.factor
c.topicConfigs,
missingTopics...,
)
if err != nil {
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
return fmt.Errorf("failed to create kafka topics: %w", err)
}
loggerFields := []zap.Field{
zap.Int("partition_count", c.partitionCount),
}
if len(c.origTopicConfigs) > 0 {
loggerFields = append(loggerFields,
zap.Reflect("topic_configs", c.origTopicConfigs),
)
}
var updateErrors []error
for _, response := range responses.Sorted() {
topicName := strings.TrimPrefix(response.Topic, namespacePrefix)
logger := c.m.cfg.Logger.With(loggerFields...)
if c.m.cfg.TopicLogFieldFunc != nil {
logger = logger.With(c.m.cfg.TopicLogFieldFunc(topicName))
}
if err := response.Err; err != nil {
if errors.Is(err, kerr.TopicAlreadyExists) {
// NOTE(axw) apmotel currently does nothing with span events,
// hence we log as well as create a span event.
logger.Debug("kafka topic already exists",
zap.String("topic", topicName),
)
span.AddEvent("kafka topic already exists", trace.WithAttributes(
semconv.MessagingDestinationKey.String(topicName),
))
} else {
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
updateErrors = append(updateErrors, fmt.Errorf(
"failed to create topic %q: %w", topicName, err,
))
c.created.Add(context.Background(), 1, metric.WithAttributeSet(
attribute.NewSet(
semconv.MessagingSystemKey.String("kafka"),
attribute.String("outcome", "failure"),
attribute.String("topic", topicName),
),
))
}
continue
}
c.created.Add(context.Background(), 1, metric.WithAttributeSet(
attribute.NewSet(
semconv.MessagingSystemKey.String("kafka"),
attribute.String("outcome", "success"),
attribute.String("topic", topicName),
),
))
logger.Info("created kafka topic", zap.String("topic", topicName))
}
// Update the topic partitions.
if len(updatePartitions) > 0 {
updateResp, err := c.m.adminClient.UpdatePartitions(ctx,
c.partitionCount,
updatePartitions...,
)
if err != nil {
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
return fmt.Errorf("failed to update partitions for kafka topics: %v: %w",
updatePartitions, err,
)
}
for _, response := range updateResp.Sorted() {
topicName := strings.TrimPrefix(response.Topic, namespacePrefix)
logger := c.m.cfg.Logger.With(loggerFields...)
if c.m.cfg.TopicLogFieldFunc != nil {
logger = logger.With(c.m.cfg.TopicLogFieldFunc(topicName))
}
if errors.Is(response.Err, kerr.InvalidRequest) {
// If UpdatePartitions partition count isn't greater than the
// current number of partitions, each individual response
// returns `INVALID_REQUEST`.
continue
}
if err := response.Err; err != nil {
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
updateErrors = append(updateErrors, fmt.Errorf(
"failed to update partitions for topic %q: %w",
topicName, err,
))
continue
}
logger.Info("updated partitions for kafka topic",
zap.String("topic", topicName),
)
}
}
if len(existingTopics) > 0 && len(c.topicConfigs) > 0 {
alterCfg := make([]kadm.AlterConfig, 0, len(c.topicConfigs))
for k, v := range c.topicConfigs {
alterCfg = append(alterCfg, kadm.AlterConfig{Name: k, Value: v})
}
alterResp, err := c.m.adminClient.AlterTopicConfigs(ctx,
alterCfg, existingTopics...,
)
if err != nil {
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
return fmt.Errorf(
"failed to update configuration for kafka topics: %v:%w",
existingTopics, err,
)
}
for _, response := range alterResp {
topicName := strings.TrimPrefix(response.Name, namespacePrefix)
logger := c.m.cfg.Logger.With(loggerFields...)
if c.m.cfg.TopicLogFieldFunc != nil {
logger = logger.With(c.m.cfg.TopicLogFieldFunc(topicName))
}
if err := response.Err; err != nil {
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
updateErrors = append(updateErrors, fmt.Errorf(
"failed to alter configuration for topic %q: %w",
topicName, err,
))
continue
}
logger.Info("altered configuration for kafka topic",
zap.String("topic", topicName),
)
}
}
return errors.Join(updateErrors...)
}