func()

in kafka/metrics.go [489:525]


func (h *metricHooks) OnFetchBatchRead(_ kgo.BrokerMetadata,
	topic string, partition int32, m kgo.FetchBatchMetrics,
) {
	attrs := make([]attribute.KeyValue, 0, 6)
	attrs = append(attrs, semconv.MessagingSystem("kafka"),
		attribute.String("topic", topic),
		semconv.MessagingSourceName(strings.TrimPrefix(topic, h.topicPrefix)),
		semconv.MessagingKafkaSourcePartition(int(partition)),
		attribute.String("compression.codec", compressionFromCodec(m.CompressionType)),
	)
	if kv := h.topicAttributeFunc(topic); kv != (attribute.KeyValue{}) {
		attrs = append(attrs, kv)
	}
	if h.namespace != "" {
		attrs = append(attrs, attribute.String("namespace", h.namespace))
	}
	h.messageFetched.Add(context.Background(), int64(m.NumRecords),
		metric.WithAttributeSet(attribute.NewSet(attrs...)),
	)
	h.messageFetchedWireBytes.Add(context.Background(), int64(m.CompressedBytes),
		metric.WithAttributeSet(attribute.NewSet(attrs...)),
	)
	h.messageFetchedUncompressedBytes.Add(context.Background(), int64(m.UncompressedBytes),
		metric.WithAttributeSet(attribute.NewSet(attrs...)),
	)

	h.fetchBytes.Add(
		context.Background(),
		int64(m.UncompressedBytes),
		metric.WithAttributeSet(attribute.NewSet(attrs...)),
	)
	h.fetchRecords.Add(
		context.Background(),
		int64(m.NumRecords),
		metric.WithAttributeSet(attribute.NewSet(attrs...)),
	)
}