func()

in kafka/manager.go [104:167]


func (m *Manager) DeleteTopics(ctx context.Context, topics ...apmqueue.Topic) error {
	// TODO(axw) how should we record topics?
	ctx, span := m.tracer.Start(ctx, "DeleteTopics", trace.WithAttributes(
		semconv.MessagingSystemKey.String("kafka"),
	))
	defer span.End()

	namespacePrefix := m.cfg.namespacePrefix()
	topicNames := make([]string, len(topics))
	for i, topic := range topics {
		topicNames[i] = fmt.Sprintf("%s%s", namespacePrefix, topic)
	}
	responses, err := m.adminClient.DeleteTopics(ctx, topicNames...)
	if err != nil {
		span.RecordError(err)
		span.SetStatus(codes.Error, "DeleteTopics returned an error")
		return fmt.Errorf("failed to delete kafka topics: %w", err)
	}
	var deleteErrors []error
	for _, response := range responses.Sorted() {
		topic := strings.TrimPrefix(response.Topic, namespacePrefix)
		logger := m.cfg.Logger.With(zap.String("topic", topic))
		if m.cfg.TopicLogFieldFunc != nil {
			logger = logger.With(m.cfg.TopicLogFieldFunc(topic))
		}

		if err := response.Err; err != nil {
			if errors.Is(err, kerr.UnknownTopicOrPartition) {
				logger.Debug("kafka topic does not exist")
			} else {
				span.RecordError(err)
				span.SetStatus(codes.Error, "failed to delete one or more topic")
				deleteErrors = append(deleteErrors,
					fmt.Errorf("failed to delete topic %q: %w", topic, err),
				)
				attrs := []attribute.KeyValue{
					semconv.MessagingSystemKey.String("kafka"),
					attribute.String("outcome", "failure"),
					attribute.String("topic", topic),
				}
				if kv := m.cfg.TopicAttributeFunc(topic); kv != (attribute.KeyValue{}) {
					attrs = append(attrs, kv)
				}
				m.deleted.Add(context.Background(), 1, metric.WithAttributeSet(
					attribute.NewSet(attrs...),
				))
			}
			continue
		}
		attrs := []attribute.KeyValue{
			semconv.MessagingSystemKey.String("kafka"),
			attribute.String("outcome", "success"),
			attribute.String("topic", topic),
		}
		if kv := m.cfg.TopicAttributeFunc(topic); kv != (attribute.KeyValue{}) {
			attrs = append(attrs, kv)
		}
		m.deleted.Add(context.Background(), 1, metric.WithAttributeSet(
			attribute.NewSet(attrs...),
		))
		logger.Info("deleted kafka topic")
	}
	return errors.Join(deleteErrors...)
}