func()

in kafka/metrics.go [447:485]


func (h *metricHooks) OnProduceBatchWritten(_ kgo.BrokerMetadata,
	topic string, partition int32, m kgo.ProduceBatchMetrics,
) {
	attrs := make([]attribute.KeyValue, 0, 7)
	attrs = append(attrs, semconv.MessagingSystem("kafka"),
		attribute.String("topic", topic),
		semconv.MessagingDestinationName(strings.TrimPrefix(topic, h.topicPrefix)),
		semconv.MessagingKafkaDestinationPartition(int(partition)),
		attribute.String("outcome", "success"),
		attribute.String("compression.codec", compressionFromCodec(m.CompressionType)),
	)
	if kv := h.topicAttributeFunc(topic); kv != (attribute.KeyValue{}) {
		attrs = append(attrs, kv)
	}
	if h.namespace != "" {
		attrs = append(attrs, attribute.String("namespace", h.namespace))
	}
	h.messageProduced.Add(context.Background(), int64(m.NumRecords),
		metric.WithAttributeSet(attribute.NewSet(attrs...)),
	)
	h.messageProducedWireBytes.Add(context.Background(), int64(m.CompressedBytes),
		metric.WithAttributeSet(attribute.NewSet(attrs...)),
	)
	h.messageProducedUncompressedBytes.Add(context.Background(), int64(m.UncompressedBytes),
		metric.WithAttributeSet(attribute.NewSet(attrs...)),
	)
	// kotel metrics
	h.produceBytes.Add(
		context.Background(),
		int64(m.UncompressedBytes),
		metric.WithAttributeSet(attribute.NewSet(attrs...)),
	)
	h.produceRecords.Add(
		context.Background(),
		int64(m.NumRecords),
		metric.WithAttributeSet(attribute.NewSet(attrs...)),
	)

}