kafka/metrics.go (522 lines of code) (raw):
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package kafka
import (
"context"
"errors"
"fmt"
"net"
"strings"
"time"
"github.com/twmb/franz-go/pkg/kerr"
"github.com/twmb/franz-go/pkg/kgo"
"github.com/twmb/franz-go/pkg/kmsg"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/metric"
semconv "go.opentelemetry.io/otel/semconv/v1.17.0"
)
const (
instrumentName = "github.com/elastic/apm-queue/kafka"
unitCount = "1"
unitBytes = "By"
msgProducedCountKey = "producer.messages.count"
msgProducedWireBytesKey = "producer.messages.wire.bytes"
msgProducedUncompressedBytesKey = "producer.messages.uncompressed.bytes"
msgFetchedKey = "consumer.messages.fetched"
msgDelayKey = "consumer.messages.delay"
msgConsumedWireBytesKey = "consumer.messages.wire.bytes"
msgConsumedUncompressedBytesKey = "consumer.messages.uncompressed.bytes"
throttlingDurationKey = "messaging.kafka.throttling.duration"
messageWriteLatencyKey = "messaging.kafka.write.latency"
messageReadLatencyKey = "messaging.kafka.read.latency"
errorReasonKey = "error_reason"
)
var (
_ kgo.HookBrokerConnect = new(metricHooks)
_ kgo.HookBrokerDisconnect = new(metricHooks)
_ kgo.HookBrokerWrite = new(metricHooks)
_ kgo.HookBrokerRead = new(metricHooks)
_ kgo.HookProduceBatchWritten = new(metricHooks)
_ kgo.HookFetchBatchRead = new(metricHooks)
_ kgo.HookFetchRecordUnbuffered = new(metricHooks)
_ kgo.HookProduceRecordUnbuffered = new(metricHooks)
_ kgo.HookBrokerThrottle = new(metricHooks)
)
// TopicAttributeFunc run on `kgo.HookProduceBatchWritten` and
// `kgo.HookFetchBatchRead` for each topic/partition. It can be
// used include additionaly dimensions for `consumer.messages.fetched`
// and `producer.messages.count` metrics.
type TopicAttributeFunc func(topic string) attribute.KeyValue
type metricHooks struct {
namespace string
topicPrefix string
// kotel metrics
connects metric.Int64Counter
connectErrs metric.Int64Counter
disconnects metric.Int64Counter
writeErrs metric.Int64Counter
writeBytes metric.Int64Counter
readErrs metric.Int64Counter
readBytes metric.Int64Counter
produceBytes metric.Int64Counter
produceRecords metric.Int64Counter
fetchBytes metric.Int64Counter
fetchRecords metric.Int64Counter
// custom metrics
messageProduced metric.Int64Counter
messageProducedWireBytes metric.Int64Counter
messageProducedUncompressedBytes metric.Int64Counter
messageWriteLatency metric.Float64Histogram
messageFetched metric.Int64Counter
messageFetchedWireBytes metric.Int64Counter
messageFetchedUncompressedBytes metric.Int64Counter
messageReadLatency metric.Float64Histogram
messageDelay metric.Float64Histogram
throttlingDuration metric.Float64Histogram
topicAttributeFunc TopicAttributeFunc
}
func newKgoHooks(mp metric.MeterProvider, namespace, topicPrefix string,
topicAttributeFunc TopicAttributeFunc,
) (*metricHooks, error) {
m := mp.Meter(instrumentName)
// kotel metrics
// connects and disconnects
connects, err := m.Int64Counter(
"messaging.kafka.connects.count",
metric.WithUnit(unitCount),
metric.WithDescription("Total number of connections opened, by broker"),
)
if err != nil {
return nil, fmt.Errorf("failed to create connects instrument, %w", err)
}
connectErrs, err := m.Int64Counter(
"messaging.kafka.connect_errors.count",
metric.WithUnit(unitCount),
metric.WithDescription("Total number of connection errors, by broker"),
)
if err != nil {
return nil, fmt.Errorf("failed to create connectErrs instrument, %w", err)
}
disconnects, err := m.Int64Counter(
"messaging.kafka.disconnects.count",
metric.WithUnit(unitCount),
metric.WithDescription("Total number of connections closed, by broker"),
)
if err != nil {
return nil, fmt.Errorf("failed to create disconnects instrument, %w", err)
}
// write
writeErrs, err := m.Int64Counter(
"messaging.kafka.write_errors.count",
metric.WithUnit(unitCount),
metric.WithDescription("Total number of write errors, by broker"),
)
if err != nil {
return nil, fmt.Errorf("failed to create writeErrs instrument, %w", err)
}
writeBytes, err := m.Int64Counter(
"messaging.kafka.write_bytes",
metric.WithUnit(unitBytes),
metric.WithDescription("Total number of bytes written, by broker"),
)
if err != nil {
return nil, fmt.Errorf("failed to create writeBytes instrument, %w", err)
}
// read
readErrs, err := m.Int64Counter(
"messaging.kafka.read_errors.count",
metric.WithUnit(unitCount),
metric.WithDescription("Total number of read errors, by broker"),
)
if err != nil {
return nil, fmt.Errorf("failed to create readErrs instrument, %w", err)
}
readBytes, err := m.Int64Counter(
"messaging.kafka.read_bytes.count",
metric.WithUnit(unitBytes),
metric.WithDescription("Total number of bytes read, by broker"),
)
if err != nil {
return nil, fmt.Errorf("failed to create readBytes instrument, %w", err)
}
// produce & consume
produceBytes, err := m.Int64Counter(
"messaging.kafka.produce_bytes.count",
metric.WithUnit(unitBytes),
metric.WithDescription("Total number of uncompressed bytes produced, by broker and topic"),
)
if err != nil {
return nil, fmt.Errorf("failed to create produceBytes instrument, %w", err)
}
produceRecords, err := m.Int64Counter(
"messaging.kafka.produce_records.count",
metric.WithUnit(unitCount),
metric.WithDescription("Total number of produced records, by broker and topic"),
)
if err != nil {
return nil, fmt.Errorf("failed to create produceRecords instrument, %w", err)
}
fetchBytes, err := m.Int64Counter(
"messaging.kafka.fetch_bytes.count",
metric.WithUnit(unitBytes),
metric.WithDescription("Total number of uncompressed bytes fetched, by broker and topic"),
)
if err != nil {
return nil, fmt.Errorf("failed to create fetchBytes instrument, %w", err)
}
fetchRecords, err := m.Int64Counter(
"messaging.kafka.fetch_records.count",
metric.WithUnit(unitCount),
metric.WithDescription("Total number of fetched records, by broker and topic"),
)
if err != nil {
return nil, fmt.Errorf("failed to create fetchRecords instrument, %w", err)
}
// custom metrics
messageProducedCounter, err := m.Int64Counter(msgProducedCountKey,
metric.WithDescription("The number of messages produced"),
metric.WithUnit(unitCount),
)
if err != nil {
return nil, formatMetricError(msgProducedCountKey, err)
}
messageProducedWireBytes, err := m.Int64Counter(msgProducedWireBytesKey,
metric.WithDescription("The number of bytes produced"),
metric.WithUnit(unitBytes),
)
if err != nil {
return nil, formatMetricError(msgProducedWireBytesKey, err)
}
messageProducedUncompressedBytes, err := m.Int64Counter(msgProducedUncompressedBytesKey,
metric.WithDescription("The number of uncompressed bytes produced"),
metric.WithUnit(unitBytes),
)
if err != nil {
return nil, formatMetricError(msgProducedUncompressedBytesKey, err)
}
messageWriteLatency, err := m.Float64Histogram(messageWriteLatencyKey,
metric.WithDescription("Time it took to write a batch including wait time before writing"),
metric.WithUnit("s"),
)
if err != nil {
return nil, formatMetricError(messageWriteLatencyKey, err)
}
messageFetchedCounter, err := m.Int64Counter(msgFetchedKey,
metric.WithDescription("The number of messages that were fetched from a kafka topic"),
metric.WithUnit(unitCount),
)
if err != nil {
return nil, formatMetricError(msgFetchedKey, err)
}
messageFetchedWireBytes, err := m.Int64Counter(msgConsumedWireBytesKey,
metric.WithDescription("The number of bytes consumed"),
metric.WithUnit(unitBytes),
)
if err != nil {
return nil, formatMetricError(msgConsumedWireBytesKey, err)
}
messageFetchedUncompressedBytes, err := m.Int64Counter(msgConsumedUncompressedBytesKey,
metric.WithDescription("The number of uncompressed bytes consumed"),
metric.WithUnit(unitBytes),
)
if err != nil {
return nil, formatMetricError(msgConsumedUncompressedBytesKey, err)
}
messageReadLatency, err := m.Float64Histogram(messageReadLatencyKey,
metric.WithDescription("Time it took to read a batch including wait time before reading"),
metric.WithUnit("s"),
)
if err != nil {
return nil, formatMetricError(messageReadLatencyKey, err)
}
messageDelayHistogram, err := m.Float64Histogram(msgDelayKey,
metric.WithDescription("The delay between producing messages and reading them"),
metric.WithUnit("s"),
)
if err != nil {
return nil, formatMetricError(msgDelayKey, err)
}
throttlingDurationHistogram, err := m.Float64Histogram(throttlingDurationKey,
metric.WithDescription("The throttling interval imposed by the broker"),
metric.WithUnit("s"),
)
if err != nil {
return nil, formatMetricError(throttlingDurationKey, err)
}
return &metricHooks{
namespace: namespace,
topicPrefix: topicPrefix,
// kotel metrics
connects: connects,
connectErrs: connectErrs,
disconnects: disconnects,
writeErrs: writeErrs,
writeBytes: writeBytes,
readErrs: readErrs,
readBytes: readBytes,
produceBytes: produceBytes,
produceRecords: produceRecords,
fetchBytes: fetchBytes,
fetchRecords: fetchRecords,
// custom metrics
// Producer
messageProduced: messageProducedCounter,
messageProducedWireBytes: messageProducedWireBytes,
messageProducedUncompressedBytes: messageProducedUncompressedBytes,
messageWriteLatency: messageWriteLatency,
// Consumer
messageFetched: messageFetchedCounter,
messageFetchedWireBytes: messageFetchedWireBytes,
messageFetchedUncompressedBytes: messageFetchedUncompressedBytes,
messageReadLatency: messageReadLatency,
messageDelay: messageDelayHistogram,
throttlingDuration: throttlingDurationHistogram,
topicAttributeFunc: topicAttributeFunc,
}, nil
}
func formatMetricError(name string, err error) error {
return fmt.Errorf("cannot create %s metric: %w", name, err)
}
func (h *metricHooks) OnBrokerConnect(meta kgo.BrokerMetadata, _ time.Duration, _ net.Conn, err error) {
attrs := make([]attribute.KeyValue, 0, 3)
attrs = append(attrs, semconv.MessagingSystem("kafka"))
if h.namespace != "" {
attrs = append(attrs, attribute.String("namespace", h.namespace))
}
if err != nil {
h.connectErrs.Add(
context.Background(),
1,
metric.WithAttributeSet(attribute.NewSet(attrs...)),
)
attrs = append(attrs, attribute.String("outcome", "failure"))
h.connects.Add(
context.Background(),
1,
metric.WithAttributeSet(attribute.NewSet(attrs...)),
)
return
}
attrs = append(attrs, attribute.String("outcome", "success"))
h.connects.Add(
context.Background(),
1,
metric.WithAttributeSet(attribute.NewSet(attrs...)),
)
}
func (h *metricHooks) OnBrokerDisconnect(kgo.BrokerMetadata, net.Conn) {
attrs := make([]attribute.KeyValue, 0, 2)
attrs = append(attrs, semconv.MessagingSystem("kafka"))
if h.namespace != "" {
attrs = append(attrs, attribute.String("namespace", h.namespace))
}
h.disconnects.Add(
context.Background(),
1,
metric.WithAttributeSet(attribute.NewSet(attrs...)),
)
}
func (h *metricHooks) OnBrokerWrite(_ kgo.BrokerMetadata, key int16, bytesWritten int, writeWait, timeToWrite time.Duration, err error) {
attrs := make([]attribute.KeyValue, 0, 3)
attrs = append(attrs,
semconv.MessagingSystem("kafka"),
attribute.String("operation", kmsg.NameForKey(key)),
)
if h.namespace != "" {
attrs = append(attrs, attribute.String("namespace", h.namespace))
}
outcome := "success"
if err != nil {
outcome = "failure"
h.writeErrs.Add(
context.Background(),
1,
metric.WithAttributeSet(attribute.NewSet(attrs...)),
)
} else {
h.writeBytes.Add(
context.Background(),
int64(bytesWritten),
metric.WithAttributeSet(attribute.NewSet(attrs...)),
)
}
attrs = append(attrs, attribute.String("outcome", outcome))
h.messageWriteLatency.Record(
context.Background(),
writeWait.Seconds()+timeToWrite.Seconds(),
metric.WithAttributeSet(attribute.NewSet(attrs...)),
)
}
func (h *metricHooks) OnBrokerRead(_ kgo.BrokerMetadata, _ int16, bytesRead int, readWait, timeToRead time.Duration, err error) {
attrs := make([]attribute.KeyValue, 0, 3)
attrs = append(attrs, semconv.MessagingSystem("kafka"))
if h.namespace != "" {
attrs = append(attrs, attribute.String("namespace", h.namespace))
}
outcome := "success"
if err != nil {
outcome = "failure"
h.readErrs.Add(
context.Background(),
1,
metric.WithAttributeSet(attribute.NewSet(attrs...)),
)
} else {
h.readBytes.Add(
context.Background(),
int64(bytesRead),
metric.WithAttributeSet(attribute.NewSet(attrs...)),
)
}
attrs = append(attrs, attribute.String("outcome", outcome))
h.messageReadLatency.Record(
context.Background(),
readWait.Seconds()+timeToRead.Seconds(),
metric.WithAttributeSet(attribute.NewSet(attrs...)),
)
}
// HookProduceBatchWritten is called when a batch has been produced.
func (h *metricHooks) OnProduceBatchWritten(_ kgo.BrokerMetadata,
topic string, partition int32, m kgo.ProduceBatchMetrics,
) {
attrs := make([]attribute.KeyValue, 0, 7)
attrs = append(attrs, semconv.MessagingSystem("kafka"),
attribute.String("topic", topic),
semconv.MessagingDestinationName(strings.TrimPrefix(topic, h.topicPrefix)),
semconv.MessagingKafkaDestinationPartition(int(partition)),
attribute.String("outcome", "success"),
attribute.String("compression.codec", compressionFromCodec(m.CompressionType)),
)
if kv := h.topicAttributeFunc(topic); kv != (attribute.KeyValue{}) {
attrs = append(attrs, kv)
}
if h.namespace != "" {
attrs = append(attrs, attribute.String("namespace", h.namespace))
}
h.messageProduced.Add(context.Background(), int64(m.NumRecords),
metric.WithAttributeSet(attribute.NewSet(attrs...)),
)
h.messageProducedWireBytes.Add(context.Background(), int64(m.CompressedBytes),
metric.WithAttributeSet(attribute.NewSet(attrs...)),
)
h.messageProducedUncompressedBytes.Add(context.Background(), int64(m.UncompressedBytes),
metric.WithAttributeSet(attribute.NewSet(attrs...)),
)
// kotel metrics
h.produceBytes.Add(
context.Background(),
int64(m.UncompressedBytes),
metric.WithAttributeSet(attribute.NewSet(attrs...)),
)
h.produceRecords.Add(
context.Background(),
int64(m.NumRecords),
metric.WithAttributeSet(attribute.NewSet(attrs...)),
)
}
// OnFetchBatchRead is called once per batch read from Kafka. Records
// `consumer.messages.fetched`.
func (h *metricHooks) OnFetchBatchRead(_ kgo.BrokerMetadata,
topic string, partition int32, m kgo.FetchBatchMetrics,
) {
attrs := make([]attribute.KeyValue, 0, 6)
attrs = append(attrs, semconv.MessagingSystem("kafka"),
attribute.String("topic", topic),
semconv.MessagingSourceName(strings.TrimPrefix(topic, h.topicPrefix)),
semconv.MessagingKafkaSourcePartition(int(partition)),
attribute.String("compression.codec", compressionFromCodec(m.CompressionType)),
)
if kv := h.topicAttributeFunc(topic); kv != (attribute.KeyValue{}) {
attrs = append(attrs, kv)
}
if h.namespace != "" {
attrs = append(attrs, attribute.String("namespace", h.namespace))
}
h.messageFetched.Add(context.Background(), int64(m.NumRecords),
metric.WithAttributeSet(attribute.NewSet(attrs...)),
)
h.messageFetchedWireBytes.Add(context.Background(), int64(m.CompressedBytes),
metric.WithAttributeSet(attribute.NewSet(attrs...)),
)
h.messageFetchedUncompressedBytes.Add(context.Background(), int64(m.UncompressedBytes),
metric.WithAttributeSet(attribute.NewSet(attrs...)),
)
h.fetchBytes.Add(
context.Background(),
int64(m.UncompressedBytes),
metric.WithAttributeSet(attribute.NewSet(attrs...)),
)
h.fetchRecords.Add(
context.Background(),
int64(m.NumRecords),
metric.WithAttributeSet(attribute.NewSet(attrs...)),
)
}
// OnProduceRecordUnbuffered records the number of produced messages that were
// not produced due to errors. The successfully produced records is recorded by
// `OnProduceBatchWritten`.
// https://pkg.go.dev/github.com/twmb/franz-go/pkg/kgo#HookProduceRecordUnbuffered
func (h *metricHooks) OnProduceRecordUnbuffered(r *kgo.Record, err error) {
if err == nil {
return // Covered by OnProduceBatchWritten.
}
attrs := attributesFromRecord(r,
attribute.String("topic", r.Topic),
semconv.MessagingDestinationName(strings.TrimPrefix(r.Topic, h.topicPrefix)),
semconv.MessagingKafkaDestinationPartition(int(r.Partition)),
attribute.String("outcome", "failure"),
)
if kv := h.topicAttributeFunc(r.Topic); kv != (attribute.KeyValue{}) {
attrs = append(attrs, kv)
}
if h.namespace != "" {
attrs = append(attrs, attribute.String("namespace", h.namespace))
}
var kgoErr *kerr.Error
switch {
case errors.Is(err, context.DeadlineExceeded):
attrs = append(attrs, attribute.String(errorReasonKey, "timeout"))
case errors.Is(err, context.Canceled):
attrs = append(attrs, attribute.String(errorReasonKey, "canceled"))
case errors.As(err, &kgoErr):
attrs = append(attrs, attribute.String(errorReasonKey, kgoErr.Message))
default:
attrs = append(attrs, attribute.String(errorReasonKey, "unknown"))
}
h.messageProduced.Add(context.Background(), 1, metric.WithAttributeSet(
attribute.NewSet(attrs...),
))
}
// OnFetchRecordUnbuffered records the message delay of fetched messages.
// https://pkg.go.dev/github.com/twmb/franz-go/pkg/kgo#HookFetchRecordUnbuffered
func (h *metricHooks) OnFetchRecordUnbuffered(r *kgo.Record, polled bool) {
if !polled {
return // Record metrics when polled by `client.PollRecords()`.
}
attrs := attributesFromRecord(r,
attribute.String("topic", r.Topic),
semconv.MessagingSourceName(strings.TrimPrefix(r.Topic, h.topicPrefix)),
semconv.MessagingKafkaSourcePartition(int(r.Partition)),
)
if kv := h.topicAttributeFunc(r.Topic); kv != (attribute.KeyValue{}) {
attrs = append(attrs, kv)
}
if h.namespace != "" {
attrs = append(attrs, attribute.String("namespace", h.namespace))
}
h.messageDelay.Record(context.Background(),
time.Since(r.Timestamp).Seconds(),
metric.WithAttributeSet(attribute.NewSet(attrs...)),
)
}
func (h *metricHooks) OnBrokerThrottle(_ kgo.BrokerMetadata, throttleInterval time.Duration, throttledAfterResponse bool) {
attrs := make([]attribute.KeyValue, 0, 2)
attrs = append(attrs, semconv.MessagingSystem("kafka"))
if h.namespace != "" {
attrs = append(attrs, attribute.String("namespace", h.namespace))
}
h.throttlingDuration.Record(context.Background(),
throttleInterval.Seconds(),
metric.WithAttributeSet(attribute.NewSet(attrs...)),
)
}
func attributesFromRecord(r *kgo.Record, extra ...attribute.KeyValue) []attribute.KeyValue {
attrs := make([]attribute.KeyValue, 0, 5) // Preallocate 5 elements.
attrs = append(attrs, semconv.MessagingSystem("kafka"))
attrs = append(attrs, extra...)
for _, v := range r.Headers {
if v.Key == "traceparent" { // Ignore traceparent.
continue
}
attrs = append(attrs, attribute.String(v.Key, string(v.Value)))
}
return attrs
}
func compressionFromCodec(c uint8) string {
// CompressionType signifies which algorithm the batch was compressed
// with.
//
// 0 is no compression, 1 is gzip, 2 is snappy, 3 is lz4, and 4 is
// zstd.
switch c {
case 0:
return "none"
case 1:
return "gzip"
case 2:
return "snappy"
case 3:
return "lz4"
case 4:
return "zstd"
default:
return "unknown"
}
}