func()

in pulsar/internal/metrics.go [541:593]


func (mp *Metrics) GetLeveledMetrics(t string) *LeveledMetrics {
	labels := make(map[string]string, 3)
	tn, err := ParseTopicName(t)
	if err != nil {
		return nil
	}
	topic := TopicNameWithoutPartitionPart(tn)
	switch mp.metricsLevel {
	case 4:
		labels["topic"] = topic
		fallthrough
	case 3:
		labels["pulsar_namespace"] = tn.Namespace
		fallthrough
	case 2:
		labels["pulsar_tenant"] = tn.Tenant
	}

	lm := &LeveledMetrics{
		MessagesPublished:        mp.messagesPublished.With(labels),
		BytesPublished:           mp.bytesPublished.With(labels),
		MessagesPending:          mp.messagesPending.With(labels),
		BytesPending:             mp.bytesPending.With(labels),
		PublishErrorsTimeout:     mp.publishErrors.With(mergeMaps(labels, map[string]string{"error": "timeout"})),
		PublishErrorsMsgTooLarge: mp.publishErrors.With(mergeMaps(labels, map[string]string{"error": "msg_too_large"})),
		PublishLatency:           mp.publishLatency.With(labels),
		PublishRPCLatency:        mp.publishRPCLatency.With(labels),

		MessagesReceived:   mp.messagesReceived.With(labels),
		BytesReceived:      mp.bytesReceived.With(labels),
		PrefetchedMessages: mp.prefetchedMessages.With(labels),
		PrefetchedBytes:    mp.prefetchedBytes.With(labels),
		AcksCounter:        mp.acksCounter.With(labels),
		NacksCounter:       mp.nacksCounter.With(labels),
		DlqCounter:         mp.dlqCounter.With(labels),
		ProcessingTime:     mp.processingTime.With(labels),

		ProducersOpened:            mp.producersOpened.With(labels),
		ProducersClosed:            mp.producersClosed.With(labels),
		ProducersReconnectFailure:  mp.producersReconnectFailure.With(labels),
		ProducersReconnectMaxRetry: mp.producersReconnectMaxRetry.With(labels),
		ProducersPartitions:        mp.producersPartitions.With(labels),
		ConsumersOpened:            mp.consumersOpened.With(labels),
		ConsumersClosed:            mp.consumersClosed.With(labels),
		ConsumersReconnectFailure:  mp.consumersReconnectFailure.With(labels),
		ConsumersReconnectMaxRetry: mp.consumersReconnectMaxRetry.With(labels),
		ConsumersPartitions:        mp.consumersPartitions.With(labels),
		ReadersOpened:              mp.readersOpened.With(labels),
		ReadersClosed:              mp.readersClosed.With(labels),
	}

	return lm
}