in kafka/metrics.go [489:525]
func (h *metricHooks) OnFetchBatchRead(_ kgo.BrokerMetadata,
topic string, partition int32, m kgo.FetchBatchMetrics,
) {
attrs := make([]attribute.KeyValue, 0, 6)
attrs = append(attrs, semconv.MessagingSystem("kafka"),
attribute.String("topic", topic),
semconv.MessagingSourceName(strings.TrimPrefix(topic, h.topicPrefix)),
semconv.MessagingKafkaSourcePartition(int(partition)),
attribute.String("compression.codec", compressionFromCodec(m.CompressionType)),
)
if kv := h.topicAttributeFunc(topic); kv != (attribute.KeyValue{}) {
attrs = append(attrs, kv)
}
if h.namespace != "" {
attrs = append(attrs, attribute.String("namespace", h.namespace))
}
h.messageFetched.Add(context.Background(), int64(m.NumRecords),
metric.WithAttributeSet(attribute.NewSet(attrs...)),
)
h.messageFetchedWireBytes.Add(context.Background(), int64(m.CompressedBytes),
metric.WithAttributeSet(attribute.NewSet(attrs...)),
)
h.messageFetchedUncompressedBytes.Add(context.Background(), int64(m.UncompressedBytes),
metric.WithAttributeSet(attribute.NewSet(attrs...)),
)
h.fetchBytes.Add(
context.Background(),
int64(m.UncompressedBytes),
metric.WithAttributeSet(attribute.NewSet(attrs...)),
)
h.fetchRecords.Add(
context.Background(),
int64(m.NumRecords),
metric.WithAttributeSet(attribute.NewSet(attrs...)),
)
}