in kafka/metrics.go [531:563]
func (h *metricHooks) OnProduceRecordUnbuffered(r *kgo.Record, err error) {
if err == nil {
return // Covered by OnProduceBatchWritten.
}
attrs := attributesFromRecord(r,
attribute.String("topic", r.Topic),
semconv.MessagingDestinationName(strings.TrimPrefix(r.Topic, h.topicPrefix)),
semconv.MessagingKafkaDestinationPartition(int(r.Partition)),
attribute.String("outcome", "failure"),
)
if kv := h.topicAttributeFunc(r.Topic); kv != (attribute.KeyValue{}) {
attrs = append(attrs, kv)
}
if h.namespace != "" {
attrs = append(attrs, attribute.String("namespace", h.namespace))
}
var kgoErr *kerr.Error
switch {
case errors.Is(err, context.DeadlineExceeded):
attrs = append(attrs, attribute.String(errorReasonKey, "timeout"))
case errors.Is(err, context.Canceled):
attrs = append(attrs, attribute.String(errorReasonKey, "canceled"))
case errors.As(err, &kgoErr):
attrs = append(attrs, attribute.String(errorReasonKey, kgoErr.Message))
default:
attrs = append(attrs, attribute.String(errorReasonKey, "unknown"))
}
h.messageProduced.Add(context.Background(), 1, metric.WithAttributeSet(
attribute.NewSet(attrs...),
))
}