func newKgoHooks()

in kafka/metrics.go [109:338]


func newKgoHooks(mp metric.MeterProvider, namespace, topicPrefix string,
	topicAttributeFunc TopicAttributeFunc,
) (*metricHooks, error) {
	m := mp.Meter(instrumentName)

	// kotel metrics

	// connects and disconnects
	connects, err := m.Int64Counter(
		"messaging.kafka.connects.count",
		metric.WithUnit(unitCount),
		metric.WithDescription("Total number of connections opened, by broker"),
	)
	if err != nil {
		return nil, fmt.Errorf("failed to create connects instrument, %w", err)
	}

	connectErrs, err := m.Int64Counter(
		"messaging.kafka.connect_errors.count",
		metric.WithUnit(unitCount),
		metric.WithDescription("Total number of connection errors, by broker"),
	)
	if err != nil {
		return nil, fmt.Errorf("failed to create connectErrs instrument, %w", err)
	}

	disconnects, err := m.Int64Counter(
		"messaging.kafka.disconnects.count",
		metric.WithUnit(unitCount),
		metric.WithDescription("Total number of connections closed, by broker"),
	)
	if err != nil {
		return nil, fmt.Errorf("failed to create disconnects instrument, %w", err)
	}

	// write

	writeErrs, err := m.Int64Counter(
		"messaging.kafka.write_errors.count",
		metric.WithUnit(unitCount),
		metric.WithDescription("Total number of write errors, by broker"),
	)
	if err != nil {
		return nil, fmt.Errorf("failed to create writeErrs instrument, %w", err)
	}

	writeBytes, err := m.Int64Counter(
		"messaging.kafka.write_bytes",
		metric.WithUnit(unitBytes),
		metric.WithDescription("Total number of bytes written, by broker"),
	)
	if err != nil {
		return nil, fmt.Errorf("failed to create writeBytes instrument, %w", err)
	}

	// read

	readErrs, err := m.Int64Counter(
		"messaging.kafka.read_errors.count",
		metric.WithUnit(unitCount),
		metric.WithDescription("Total number of read errors, by broker"),
	)
	if err != nil {
		return nil, fmt.Errorf("failed to create readErrs instrument, %w", err)
	}

	readBytes, err := m.Int64Counter(
		"messaging.kafka.read_bytes.count",
		metric.WithUnit(unitBytes),
		metric.WithDescription("Total number of bytes read, by broker"),
	)
	if err != nil {
		return nil, fmt.Errorf("failed to create readBytes instrument, %w", err)
	}

	// produce & consume

	produceBytes, err := m.Int64Counter(
		"messaging.kafka.produce_bytes.count",
		metric.WithUnit(unitBytes),
		metric.WithDescription("Total number of uncompressed bytes produced, by broker and topic"),
	)
	if err != nil {
		return nil, fmt.Errorf("failed to create produceBytes instrument, %w", err)
	}

	produceRecords, err := m.Int64Counter(
		"messaging.kafka.produce_records.count",
		metric.WithUnit(unitCount),
		metric.WithDescription("Total number of produced records, by broker and topic"),
	)
	if err != nil {
		return nil, fmt.Errorf("failed to create produceRecords instrument, %w", err)
	}

	fetchBytes, err := m.Int64Counter(
		"messaging.kafka.fetch_bytes.count",
		metric.WithUnit(unitBytes),
		metric.WithDescription("Total number of uncompressed bytes fetched, by broker and topic"),
	)
	if err != nil {
		return nil, fmt.Errorf("failed to create fetchBytes instrument, %w", err)
	}

	fetchRecords, err := m.Int64Counter(
		"messaging.kafka.fetch_records.count",
		metric.WithUnit(unitCount),
		metric.WithDescription("Total number of fetched records, by broker and topic"),
	)
	if err != nil {
		return nil, fmt.Errorf("failed to create fetchRecords instrument, %w", err)
	}

	// custom metrics
	messageProducedCounter, err := m.Int64Counter(msgProducedCountKey,
		metric.WithDescription("The number of messages produced"),
		metric.WithUnit(unitCount),
	)
	if err != nil {
		return nil, formatMetricError(msgProducedCountKey, err)
	}
	messageProducedWireBytes, err := m.Int64Counter(msgProducedWireBytesKey,
		metric.WithDescription("The number of bytes produced"),
		metric.WithUnit(unitBytes),
	)
	if err != nil {
		return nil, formatMetricError(msgProducedWireBytesKey, err)
	}

	messageProducedUncompressedBytes, err := m.Int64Counter(msgProducedUncompressedBytesKey,
		metric.WithDescription("The number of uncompressed bytes produced"),
		metric.WithUnit(unitBytes),
	)
	if err != nil {
		return nil, formatMetricError(msgProducedUncompressedBytesKey, err)
	}

	messageWriteLatency, err := m.Float64Histogram(messageWriteLatencyKey,
		metric.WithDescription("Time it took to write a batch including wait time before writing"),
		metric.WithUnit("s"),
	)
	if err != nil {
		return nil, formatMetricError(messageWriteLatencyKey, err)
	}

	messageFetchedCounter, err := m.Int64Counter(msgFetchedKey,
		metric.WithDescription("The number of messages that were fetched from a kafka topic"),
		metric.WithUnit(unitCount),
	)
	if err != nil {
		return nil, formatMetricError(msgFetchedKey, err)
	}

	messageFetchedWireBytes, err := m.Int64Counter(msgConsumedWireBytesKey,
		metric.WithDescription("The number of bytes consumed"),
		metric.WithUnit(unitBytes),
	)
	if err != nil {
		return nil, formatMetricError(msgConsumedWireBytesKey, err)
	}

	messageFetchedUncompressedBytes, err := m.Int64Counter(msgConsumedUncompressedBytesKey,
		metric.WithDescription("The number of uncompressed bytes consumed"),
		metric.WithUnit(unitBytes),
	)
	if err != nil {
		return nil, formatMetricError(msgConsumedUncompressedBytesKey, err)
	}

	messageReadLatency, err := m.Float64Histogram(messageReadLatencyKey,
		metric.WithDescription("Time it took to read a batch including wait time before reading"),
		metric.WithUnit("s"),
	)
	if err != nil {
		return nil, formatMetricError(messageReadLatencyKey, err)
	}

	messageDelayHistogram, err := m.Float64Histogram(msgDelayKey,
		metric.WithDescription("The delay between producing messages and reading them"),
		metric.WithUnit("s"),
	)
	if err != nil {
		return nil, formatMetricError(msgDelayKey, err)
	}

	throttlingDurationHistogram, err := m.Float64Histogram(throttlingDurationKey,
		metric.WithDescription("The throttling interval imposed by the broker"),
		metric.WithUnit("s"),
	)
	if err != nil {
		return nil, formatMetricError(throttlingDurationKey, err)
	}

	return &metricHooks{
		namespace:   namespace,
		topicPrefix: topicPrefix,
		// kotel metrics
		connects:    connects,
		connectErrs: connectErrs,
		disconnects: disconnects,

		writeErrs:  writeErrs,
		writeBytes: writeBytes,

		readErrs:  readErrs,
		readBytes: readBytes,

		produceBytes:   produceBytes,
		produceRecords: produceRecords,
		fetchBytes:     fetchBytes,
		fetchRecords:   fetchRecords,

		// custom metrics

		// Producer
		messageProduced:                  messageProducedCounter,
		messageProducedWireBytes:         messageProducedWireBytes,
		messageProducedUncompressedBytes: messageProducedUncompressedBytes,
		messageWriteLatency:              messageWriteLatency,
		// Consumer
		messageFetched:                  messageFetchedCounter,
		messageFetchedWireBytes:         messageFetchedWireBytes,
		messageFetchedUncompressedBytes: messageFetchedUncompressedBytes,
		messageReadLatency:              messageReadLatency,
		messageDelay:                    messageDelayHistogram,
		throttlingDuration:              throttlingDurationHistogram,

		topicAttributeFunc: topicAttributeFunc,
	}, nil
}