in kafka/metrics.go [385:415]
func (h *metricHooks) OnBrokerWrite(_ kgo.BrokerMetadata, key int16, bytesWritten int, writeWait, timeToWrite time.Duration, err error) {
attrs := make([]attribute.KeyValue, 0, 3)
attrs = append(attrs,
semconv.MessagingSystem("kafka"),
attribute.String("operation", kmsg.NameForKey(key)),
)
if h.namespace != "" {
attrs = append(attrs, attribute.String("namespace", h.namespace))
}
outcome := "success"
if err != nil {
outcome = "failure"
h.writeErrs.Add(
context.Background(),
1,
metric.WithAttributeSet(attribute.NewSet(attrs...)),
)
} else {
h.writeBytes.Add(
context.Background(),
int64(bytesWritten),
metric.WithAttributeSet(attribute.NewSet(attrs...)),
)
}
attrs = append(attrs, attribute.String("outcome", outcome))
h.messageWriteLatency.Record(
context.Background(),
writeWait.Seconds()+timeToWrite.Seconds(),
metric.WithAttributeSet(attribute.NewSet(attrs...)),
)
}