in kafka/metrics.go [447:485]
func (h *metricHooks) OnProduceBatchWritten(_ kgo.BrokerMetadata,
topic string, partition int32, m kgo.ProduceBatchMetrics,
) {
attrs := make([]attribute.KeyValue, 0, 7)
attrs = append(attrs, semconv.MessagingSystem("kafka"),
attribute.String("topic", topic),
semconv.MessagingDestinationName(strings.TrimPrefix(topic, h.topicPrefix)),
semconv.MessagingKafkaDestinationPartition(int(partition)),
attribute.String("outcome", "success"),
attribute.String("compression.codec", compressionFromCodec(m.CompressionType)),
)
if kv := h.topicAttributeFunc(topic); kv != (attribute.KeyValue{}) {
attrs = append(attrs, kv)
}
if h.namespace != "" {
attrs = append(attrs, attribute.String("namespace", h.namespace))
}
h.messageProduced.Add(context.Background(), int64(m.NumRecords),
metric.WithAttributeSet(attribute.NewSet(attrs...)),
)
h.messageProducedWireBytes.Add(context.Background(), int64(m.CompressedBytes),
metric.WithAttributeSet(attribute.NewSet(attrs...)),
)
h.messageProducedUncompressedBytes.Add(context.Background(), int64(m.UncompressedBytes),
metric.WithAttributeSet(attribute.NewSet(attrs...)),
)
// kotel metrics
h.produceBytes.Add(
context.Background(),
int64(m.UncompressedBytes),
metric.WithAttributeSet(attribute.NewSet(attrs...)),
)
h.produceRecords.Add(
context.Background(),
int64(m.NumRecords),
metric.WithAttributeSet(attribute.NewSet(attrs...)),
)
}