func()

in kafka/metrics.go [567:586]


func (h *metricHooks) OnFetchRecordUnbuffered(r *kgo.Record, polled bool) {
	if !polled {
		return // Record metrics when polled by `client.PollRecords()`.
	}
	attrs := attributesFromRecord(r,
		attribute.String("topic", r.Topic),
		semconv.MessagingSourceName(strings.TrimPrefix(r.Topic, h.topicPrefix)),
		semconv.MessagingKafkaSourcePartition(int(r.Partition)),
	)
	if kv := h.topicAttributeFunc(r.Topic); kv != (attribute.KeyValue{}) {
		attrs = append(attrs, kv)
	}
	if h.namespace != "" {
		attrs = append(attrs, attribute.String("namespace", h.namespace))
	}
	h.messageDelay.Record(context.Background(),
		time.Since(r.Timestamp).Seconds(),
		metric.WithAttributeSet(attribute.NewSet(attrs...)),
	)
}