in kafka/metrics.go [109:338]
func newKgoHooks(mp metric.MeterProvider, namespace, topicPrefix string,
topicAttributeFunc TopicAttributeFunc,
) (*metricHooks, error) {
m := mp.Meter(instrumentName)
// kotel metrics
// connects and disconnects
connects, err := m.Int64Counter(
"messaging.kafka.connects.count",
metric.WithUnit(unitCount),
metric.WithDescription("Total number of connections opened, by broker"),
)
if err != nil {
return nil, fmt.Errorf("failed to create connects instrument, %w", err)
}
connectErrs, err := m.Int64Counter(
"messaging.kafka.connect_errors.count",
metric.WithUnit(unitCount),
metric.WithDescription("Total number of connection errors, by broker"),
)
if err != nil {
return nil, fmt.Errorf("failed to create connectErrs instrument, %w", err)
}
disconnects, err := m.Int64Counter(
"messaging.kafka.disconnects.count",
metric.WithUnit(unitCount),
metric.WithDescription("Total number of connections closed, by broker"),
)
if err != nil {
return nil, fmt.Errorf("failed to create disconnects instrument, %w", err)
}
// write
writeErrs, err := m.Int64Counter(
"messaging.kafka.write_errors.count",
metric.WithUnit(unitCount),
metric.WithDescription("Total number of write errors, by broker"),
)
if err != nil {
return nil, fmt.Errorf("failed to create writeErrs instrument, %w", err)
}
writeBytes, err := m.Int64Counter(
"messaging.kafka.write_bytes",
metric.WithUnit(unitBytes),
metric.WithDescription("Total number of bytes written, by broker"),
)
if err != nil {
return nil, fmt.Errorf("failed to create writeBytes instrument, %w", err)
}
// read
readErrs, err := m.Int64Counter(
"messaging.kafka.read_errors.count",
metric.WithUnit(unitCount),
metric.WithDescription("Total number of read errors, by broker"),
)
if err != nil {
return nil, fmt.Errorf("failed to create readErrs instrument, %w", err)
}
readBytes, err := m.Int64Counter(
"messaging.kafka.read_bytes.count",
metric.WithUnit(unitBytes),
metric.WithDescription("Total number of bytes read, by broker"),
)
if err != nil {
return nil, fmt.Errorf("failed to create readBytes instrument, %w", err)
}
// produce & consume
produceBytes, err := m.Int64Counter(
"messaging.kafka.produce_bytes.count",
metric.WithUnit(unitBytes),
metric.WithDescription("Total number of uncompressed bytes produced, by broker and topic"),
)
if err != nil {
return nil, fmt.Errorf("failed to create produceBytes instrument, %w", err)
}
produceRecords, err := m.Int64Counter(
"messaging.kafka.produce_records.count",
metric.WithUnit(unitCount),
metric.WithDescription("Total number of produced records, by broker and topic"),
)
if err != nil {
return nil, fmt.Errorf("failed to create produceRecords instrument, %w", err)
}
fetchBytes, err := m.Int64Counter(
"messaging.kafka.fetch_bytes.count",
metric.WithUnit(unitBytes),
metric.WithDescription("Total number of uncompressed bytes fetched, by broker and topic"),
)
if err != nil {
return nil, fmt.Errorf("failed to create fetchBytes instrument, %w", err)
}
fetchRecords, err := m.Int64Counter(
"messaging.kafka.fetch_records.count",
metric.WithUnit(unitCount),
metric.WithDescription("Total number of fetched records, by broker and topic"),
)
if err != nil {
return nil, fmt.Errorf("failed to create fetchRecords instrument, %w", err)
}
// custom metrics
messageProducedCounter, err := m.Int64Counter(msgProducedCountKey,
metric.WithDescription("The number of messages produced"),
metric.WithUnit(unitCount),
)
if err != nil {
return nil, formatMetricError(msgProducedCountKey, err)
}
messageProducedWireBytes, err := m.Int64Counter(msgProducedWireBytesKey,
metric.WithDescription("The number of bytes produced"),
metric.WithUnit(unitBytes),
)
if err != nil {
return nil, formatMetricError(msgProducedWireBytesKey, err)
}
messageProducedUncompressedBytes, err := m.Int64Counter(msgProducedUncompressedBytesKey,
metric.WithDescription("The number of uncompressed bytes produced"),
metric.WithUnit(unitBytes),
)
if err != nil {
return nil, formatMetricError(msgProducedUncompressedBytesKey, err)
}
messageWriteLatency, err := m.Float64Histogram(messageWriteLatencyKey,
metric.WithDescription("Time it took to write a batch including wait time before writing"),
metric.WithUnit("s"),
)
if err != nil {
return nil, formatMetricError(messageWriteLatencyKey, err)
}
messageFetchedCounter, err := m.Int64Counter(msgFetchedKey,
metric.WithDescription("The number of messages that were fetched from a kafka topic"),
metric.WithUnit(unitCount),
)
if err != nil {
return nil, formatMetricError(msgFetchedKey, err)
}
messageFetchedWireBytes, err := m.Int64Counter(msgConsumedWireBytesKey,
metric.WithDescription("The number of bytes consumed"),
metric.WithUnit(unitBytes),
)
if err != nil {
return nil, formatMetricError(msgConsumedWireBytesKey, err)
}
messageFetchedUncompressedBytes, err := m.Int64Counter(msgConsumedUncompressedBytesKey,
metric.WithDescription("The number of uncompressed bytes consumed"),
metric.WithUnit(unitBytes),
)
if err != nil {
return nil, formatMetricError(msgConsumedUncompressedBytesKey, err)
}
messageReadLatency, err := m.Float64Histogram(messageReadLatencyKey,
metric.WithDescription("Time it took to read a batch including wait time before reading"),
metric.WithUnit("s"),
)
if err != nil {
return nil, formatMetricError(messageReadLatencyKey, err)
}
messageDelayHistogram, err := m.Float64Histogram(msgDelayKey,
metric.WithDescription("The delay between producing messages and reading them"),
metric.WithUnit("s"),
)
if err != nil {
return nil, formatMetricError(msgDelayKey, err)
}
throttlingDurationHistogram, err := m.Float64Histogram(throttlingDurationKey,
metric.WithDescription("The throttling interval imposed by the broker"),
metric.WithUnit("s"),
)
if err != nil {
return nil, formatMetricError(throttlingDurationKey, err)
}
return &metricHooks{
namespace: namespace,
topicPrefix: topicPrefix,
// kotel metrics
connects: connects,
connectErrs: connectErrs,
disconnects: disconnects,
writeErrs: writeErrs,
writeBytes: writeBytes,
readErrs: readErrs,
readBytes: readBytes,
produceBytes: produceBytes,
produceRecords: produceRecords,
fetchBytes: fetchBytes,
fetchRecords: fetchRecords,
// custom metrics
// Producer
messageProduced: messageProducedCounter,
messageProducedWireBytes: messageProducedWireBytes,
messageProducedUncompressedBytes: messageProducedUncompressedBytes,
messageWriteLatency: messageWriteLatency,
// Consumer
messageFetched: messageFetchedCounter,
messageFetchedWireBytes: messageFetchedWireBytes,
messageFetchedUncompressedBytes: messageFetchedUncompressedBytes,
messageReadLatency: messageReadLatency,
messageDelay: messageDelayHistogram,
throttlingDuration: throttlingDurationHistogram,
topicAttributeFunc: topicAttributeFunc,
}, nil
}