in kafka/metrics.go [567:586]
func (h *metricHooks) OnFetchRecordUnbuffered(r *kgo.Record, polled bool) {
if !polled {
return // Record metrics when polled by `client.PollRecords()`.
}
attrs := attributesFromRecord(r,
attribute.String("topic", r.Topic),
semconv.MessagingSourceName(strings.TrimPrefix(r.Topic, h.topicPrefix)),
semconv.MessagingKafkaSourcePartition(int(r.Partition)),
)
if kv := h.topicAttributeFunc(r.Topic); kv != (attribute.KeyValue{}) {
attrs = append(attrs, kv)
}
if h.namespace != "" {
attrs = append(attrs, attribute.String("namespace", h.namespace))
}
h.messageDelay.Record(context.Background(),
time.Since(r.Timestamp).Seconds(),
metric.WithAttributeSet(attribute.NewSet(attrs...)),
)
}