in pulsar/internal/metrics.go [541:593]
func (mp *Metrics) GetLeveledMetrics(t string) *LeveledMetrics {
labels := make(map[string]string, 3)
tn, err := ParseTopicName(t)
if err != nil {
return nil
}
topic := TopicNameWithoutPartitionPart(tn)
switch mp.metricsLevel {
case 4:
labels["topic"] = topic
fallthrough
case 3:
labels["pulsar_namespace"] = tn.Namespace
fallthrough
case 2:
labels["pulsar_tenant"] = tn.Tenant
}
lm := &LeveledMetrics{
MessagesPublished: mp.messagesPublished.With(labels),
BytesPublished: mp.bytesPublished.With(labels),
MessagesPending: mp.messagesPending.With(labels),
BytesPending: mp.bytesPending.With(labels),
PublishErrorsTimeout: mp.publishErrors.With(mergeMaps(labels, map[string]string{"error": "timeout"})),
PublishErrorsMsgTooLarge: mp.publishErrors.With(mergeMaps(labels, map[string]string{"error": "msg_too_large"})),
PublishLatency: mp.publishLatency.With(labels),
PublishRPCLatency: mp.publishRPCLatency.With(labels),
MessagesReceived: mp.messagesReceived.With(labels),
BytesReceived: mp.bytesReceived.With(labels),
PrefetchedMessages: mp.prefetchedMessages.With(labels),
PrefetchedBytes: mp.prefetchedBytes.With(labels),
AcksCounter: mp.acksCounter.With(labels),
NacksCounter: mp.nacksCounter.With(labels),
DlqCounter: mp.dlqCounter.With(labels),
ProcessingTime: mp.processingTime.With(labels),
ProducersOpened: mp.producersOpened.With(labels),
ProducersClosed: mp.producersClosed.With(labels),
ProducersReconnectFailure: mp.producersReconnectFailure.With(labels),
ProducersReconnectMaxRetry: mp.producersReconnectMaxRetry.With(labels),
ProducersPartitions: mp.producersPartitions.With(labels),
ConsumersOpened: mp.consumersOpened.With(labels),
ConsumersClosed: mp.consumersClosed.With(labels),
ConsumersReconnectFailure: mp.consumersReconnectFailure.With(labels),
ConsumersReconnectMaxRetry: mp.consumersReconnectMaxRetry.With(labels),
ConsumersPartitions: mp.consumersPartitions.With(labels),
ReadersOpened: mp.readersOpened.With(labels),
ReadersClosed: mp.readersClosed.With(labels),
}
return lm
}