func()

in kafka/metrics.go [385:415]


func (h *metricHooks) OnBrokerWrite(_ kgo.BrokerMetadata, key int16, bytesWritten int, writeWait, timeToWrite time.Duration, err error) {
	attrs := make([]attribute.KeyValue, 0, 3)
	attrs = append(attrs,
		semconv.MessagingSystem("kafka"),
		attribute.String("operation", kmsg.NameForKey(key)),
	)
	if h.namespace != "" {
		attrs = append(attrs, attribute.String("namespace", h.namespace))
	}
	outcome := "success"
	if err != nil {
		outcome = "failure"
		h.writeErrs.Add(
			context.Background(),
			1,
			metric.WithAttributeSet(attribute.NewSet(attrs...)),
		)
	} else {
		h.writeBytes.Add(
			context.Background(),
			int64(bytesWritten),
			metric.WithAttributeSet(attribute.NewSet(attrs...)),
		)
	}
	attrs = append(attrs, attribute.String("outcome", outcome))
	h.messageWriteLatency.Record(
		context.Background(),
		writeWait.Seconds()+timeToWrite.Seconds(),
		metric.WithAttributeSet(attribute.NewSet(attrs...)),
	)
}