in kafka/manager.go [104:167]
func (m *Manager) DeleteTopics(ctx context.Context, topics ...apmqueue.Topic) error {
// TODO(axw) how should we record topics?
ctx, span := m.tracer.Start(ctx, "DeleteTopics", trace.WithAttributes(
semconv.MessagingSystemKey.String("kafka"),
))
defer span.End()
namespacePrefix := m.cfg.namespacePrefix()
topicNames := make([]string, len(topics))
for i, topic := range topics {
topicNames[i] = fmt.Sprintf("%s%s", namespacePrefix, topic)
}
responses, err := m.adminClient.DeleteTopics(ctx, topicNames...)
if err != nil {
span.RecordError(err)
span.SetStatus(codes.Error, "DeleteTopics returned an error")
return fmt.Errorf("failed to delete kafka topics: %w", err)
}
var deleteErrors []error
for _, response := range responses.Sorted() {
topic := strings.TrimPrefix(response.Topic, namespacePrefix)
logger := m.cfg.Logger.With(zap.String("topic", topic))
if m.cfg.TopicLogFieldFunc != nil {
logger = logger.With(m.cfg.TopicLogFieldFunc(topic))
}
if err := response.Err; err != nil {
if errors.Is(err, kerr.UnknownTopicOrPartition) {
logger.Debug("kafka topic does not exist")
} else {
span.RecordError(err)
span.SetStatus(codes.Error, "failed to delete one or more topic")
deleteErrors = append(deleteErrors,
fmt.Errorf("failed to delete topic %q: %w", topic, err),
)
attrs := []attribute.KeyValue{
semconv.MessagingSystemKey.String("kafka"),
attribute.String("outcome", "failure"),
attribute.String("topic", topic),
}
if kv := m.cfg.TopicAttributeFunc(topic); kv != (attribute.KeyValue{}) {
attrs = append(attrs, kv)
}
m.deleted.Add(context.Background(), 1, metric.WithAttributeSet(
attribute.NewSet(attrs...),
))
}
continue
}
attrs := []attribute.KeyValue{
semconv.MessagingSystemKey.String("kafka"),
attribute.String("outcome", "success"),
attribute.String("topic", topic),
}
if kv := m.cfg.TopicAttributeFunc(topic); kv != (attribute.KeyValue{}) {
attrs = append(attrs, kv)
}
m.deleted.Add(context.Background(), 1, metric.WithAttributeSet(
attribute.NewSet(attrs...),
))
logger.Info("deleted kafka topic")
}
return errors.Join(deleteErrors...)
}