func()

in kafka/metrics.go [531:563]


func (h *metricHooks) OnProduceRecordUnbuffered(r *kgo.Record, err error) {
	if err == nil {
		return // Covered by OnProduceBatchWritten.
	}
	attrs := attributesFromRecord(r,
		attribute.String("topic", r.Topic),
		semconv.MessagingDestinationName(strings.TrimPrefix(r.Topic, h.topicPrefix)),
		semconv.MessagingKafkaDestinationPartition(int(r.Partition)),
		attribute.String("outcome", "failure"),
	)
	if kv := h.topicAttributeFunc(r.Topic); kv != (attribute.KeyValue{}) {
		attrs = append(attrs, kv)
	}
	if h.namespace != "" {
		attrs = append(attrs, attribute.String("namespace", h.namespace))
	}

	var kgoErr *kerr.Error
	switch {
	case errors.Is(err, context.DeadlineExceeded):
		attrs = append(attrs, attribute.String(errorReasonKey, "timeout"))
	case errors.Is(err, context.Canceled):
		attrs = append(attrs, attribute.String(errorReasonKey, "canceled"))
	case errors.As(err, &kgoErr):
		attrs = append(attrs, attribute.String(errorReasonKey, kgoErr.Message))
	default:
		attrs = append(attrs, attribute.String(errorReasonKey, "unknown"))
	}

	h.messageProduced.Add(context.Background(), 1, metric.WithAttributeSet(
		attribute.NewSet(attrs...),
	))
}