in pulsar/internal/metrics.go [100:539]
func NewMetricsProvider(metricsCardinality int, userDefinedLabels map[string]string,
registerer prometheus.Registerer) *Metrics {
constLabels := map[string]string{
"client": "go",
}
for k, v := range userDefinedLabels {
constLabels[k] = v
}
var metricsLevelLabels []string
// note: ints here mirror MetricsCardinality in client.go to avoid import cycle
switch metricsCardinality {
case 1: //MetricsCardinalityNone
metricsLevelLabels = []string{}
case 2: //MetricsCardinalityTenant
metricsLevelLabels = []string{"pulsar_tenant"}
case 3: //MetricsCardinalityNamespace
metricsLevelLabels = []string{"pulsar_tenant", "pulsar_namespace"}
case 4: //MetricsCardinalityTopic
metricsLevelLabels = []string{"pulsar_tenant", "pulsar_namespace", "topic"}
default: //Anything else is namespace
metricsLevelLabels = []string{"pulsar_tenant", "pulsar_namespace"}
}
metrics := &Metrics{
metricsLevel: metricsCardinality,
messagesPublished: prometheus.NewCounterVec(prometheus.CounterOpts{
Name: "pulsar_client_messages_published",
Help: "Counter of messages published by the client",
ConstLabels: constLabels,
}, metricsLevelLabels),
bytesPublished: prometheus.NewCounterVec(prometheus.CounterOpts{
Name: "pulsar_client_bytes_published",
Help: "Counter of messages published by the client",
ConstLabels: constLabels,
}, metricsLevelLabels),
messagesPending: prometheus.NewGaugeVec(prometheus.GaugeOpts{
Name: "pulsar_client_producer_pending_messages",
Help: "Counter of messages pending to be published by the client",
ConstLabels: constLabels,
}, metricsLevelLabels),
bytesPending: prometheus.NewGaugeVec(prometheus.GaugeOpts{
Name: "pulsar_client_producer_pending_bytes",
Help: "Counter of bytes pending to be published by the client",
ConstLabels: constLabels,
}, metricsLevelLabels),
publishErrors: prometheus.NewCounterVec(prometheus.CounterOpts{
Name: "pulsar_client_producer_errors",
Help: "Counter of publish errors",
ConstLabels: constLabels,
}, append(metricsLevelLabels, "error")),
publishLatency: prometheus.NewHistogramVec(prometheus.HistogramOpts{
Name: "pulsar_client_producer_latency_seconds",
Help: "Publish latency experienced by the client",
ConstLabels: constLabels,
Buckets: []float64{.0005, .001, .005, .01, .025, .05, .1, .25, .5, 1, 2.5, 5, 10},
}, metricsLevelLabels),
publishRPCLatency: prometheus.NewHistogramVec(prometheus.HistogramOpts{
Name: "pulsar_client_producer_rpc_latency_seconds",
Help: "Publish RPC latency experienced internally by the client when sending data to receiving an ack",
ConstLabels: constLabels,
Buckets: []float64{.0005, .001, .005, .01, .025, .05, .1, .25, .5, 1, 2.5, 5, 10},
}, metricsLevelLabels),
producersOpened: prometheus.NewCounterVec(prometheus.CounterOpts{
Name: "pulsar_client_producers_opened",
Help: "Counter of producers created by the client",
ConstLabels: constLabels,
}, metricsLevelLabels),
producersClosed: prometheus.NewCounterVec(prometheus.CounterOpts{
Name: "pulsar_client_producers_closed",
Help: "Counter of producers closed by the client",
ConstLabels: constLabels,
}, metricsLevelLabels),
producersPartitions: prometheus.NewGaugeVec(prometheus.GaugeOpts{
Name: "pulsar_client_producers_partitions_active",
Help: "Counter of individual partitions the producers are currently active",
ConstLabels: constLabels,
}, metricsLevelLabels),
producersReconnectFailure: prometheus.NewCounterVec(prometheus.CounterOpts{
Name: "pulsar_client_producers_reconnect_failure",
Help: "Counter of reconnect failure of producers",
ConstLabels: constLabels,
}, metricsLevelLabels),
producersReconnectMaxRetry: prometheus.NewCounterVec(prometheus.CounterOpts{
Name: "pulsar_client_producers_reconnect_max_retry",
Help: "Counter of producer reconnect max retry reached",
ConstLabels: constLabels,
}, metricsLevelLabels),
consumersOpened: prometheus.NewCounterVec(prometheus.CounterOpts{
Name: "pulsar_client_consumers_opened",
Help: "Counter of consumers created by the client",
ConstLabels: constLabels,
}, metricsLevelLabels),
consumersClosed: prometheus.NewCounterVec(prometheus.CounterOpts{
Name: "pulsar_client_consumers_closed",
Help: "Counter of consumers closed by the client",
ConstLabels: constLabels,
}, metricsLevelLabels),
consumersReconnectFailure: prometheus.NewCounterVec(prometheus.CounterOpts{
Name: "pulsar_client_consumers_reconnect_failure",
Help: "Counter of reconnect failure of consumers",
ConstLabels: constLabels,
}, metricsLevelLabels),
consumersReconnectMaxRetry: prometheus.NewCounterVec(prometheus.CounterOpts{
Name: "pulsar_client_consumers_reconnect_max_retry",
Help: "Counter of consumer reconnect max retry reached",
ConstLabels: constLabels,
}, metricsLevelLabels),
consumersPartitions: prometheus.NewGaugeVec(prometheus.GaugeOpts{
Name: "pulsar_client_consumers_partitions_active",
Help: "Counter of individual partitions the consumers are currently active",
ConstLabels: constLabels,
}, metricsLevelLabels),
messagesReceived: prometheus.NewCounterVec(prometheus.CounterOpts{
Name: "pulsar_client_messages_received",
Help: "Counter of messages received by the client",
ConstLabels: constLabels,
}, metricsLevelLabels),
bytesReceived: prometheus.NewCounterVec(prometheus.CounterOpts{
Name: "pulsar_client_bytes_received",
Help: "Counter of bytes received by the client",
ConstLabels: constLabels,
}, metricsLevelLabels),
prefetchedMessages: prometheus.NewGaugeVec(prometheus.GaugeOpts{
Name: "pulsar_client_consumer_prefetched_messages",
Help: "Number of messages currently sitting in the consumer pre-fetch queue",
ConstLabels: constLabels,
}, metricsLevelLabels),
prefetchedBytes: prometheus.NewGaugeVec(prometheus.GaugeOpts{
Name: "pulsar_client_consumer_prefetched_bytes",
Help: "Total number of bytes currently sitting in the consumer pre-fetch queue",
ConstLabels: constLabels,
}, metricsLevelLabels),
acksCounter: prometheus.NewCounterVec(prometheus.CounterOpts{
Name: "pulsar_client_consumer_acks",
Help: "Counter of messages acked by client",
ConstLabels: constLabels,
}, metricsLevelLabels),
nacksCounter: prometheus.NewCounterVec(prometheus.CounterOpts{
Name: "pulsar_client_consumer_nacks",
Help: "Counter of messages nacked by client",
ConstLabels: constLabels,
}, metricsLevelLabels),
dlqCounter: prometheus.NewCounterVec(prometheus.CounterOpts{
Name: "pulsar_client_consumer_dlq_messages",
Help: "Counter of messages sent to Dead letter queue",
ConstLabels: constLabels,
}, metricsLevelLabels),
processingTime: prometheus.NewHistogramVec(prometheus.HistogramOpts{
Name: "pulsar_client_consumer_processing_time_seconds",
Help: "Time it takes for application to process messages",
Buckets: []float64{.0005, .001, .005, .01, .025, .05, .1, .25, .5, 1, 2.5, 5, 10},
ConstLabels: constLabels,
}, metricsLevelLabels),
readersOpened: prometheus.NewCounterVec(prometheus.CounterOpts{
Name: "pulsar_client_readers_opened",
Help: "Counter of readers created by the client",
ConstLabels: constLabels,
}, metricsLevelLabels),
readersClosed: prometheus.NewCounterVec(prometheus.CounterOpts{
Name: "pulsar_client_readers_closed",
Help: "Counter of readers closed by the client",
ConstLabels: constLabels,
}, metricsLevelLabels),
ConnectionsOpened: prometheus.NewCounter(prometheus.CounterOpts{
Name: "pulsar_client_connections_opened",
Help: "Counter of connections created by the client",
ConstLabels: constLabels,
}),
ConnectionsClosed: prometheus.NewCounter(prometheus.CounterOpts{
Name: "pulsar_client_connections_closed",
Help: "Counter of connections closed by the client",
ConstLabels: constLabels,
}),
ConnectionsEstablishmentErrors: prometheus.NewCounter(prometheus.CounterOpts{
Name: "pulsar_client_connections_establishment_errors",
Help: "Counter of errors in connections establishment",
ConstLabels: constLabels,
}),
ConnectionsHandshakeErrors: prometheus.NewCounter(prometheus.CounterOpts{
Name: "pulsar_client_connections_handshake_errors",
Help: "Counter of errors in connections handshake (eg: authz)",
ConstLabels: constLabels,
}),
LookupRequestsCount: prometheus.NewCounter(prometheus.CounterOpts{
Name: "pulsar_client_lookup_count",
Help: "Counter of lookup requests made by the client",
ConstLabels: constLabels,
}),
PartitionedTopicMetadataRequestsCount: prometheus.NewCounter(prometheus.CounterOpts{
Name: "pulsar_client_partitioned_topic_metadata_count",
Help: "Counter of partitioned_topic_metadata requests made by the client",
ConstLabels: constLabels,
}),
RPCRequestCount: prometheus.NewCounter(prometheus.CounterOpts{
Name: "pulsar_client_rpc_count",
Help: "Counter of RPC requests made by the client",
ConstLabels: constLabels,
}),
}
err := registerer.Register(metrics.messagesPublished)
if err != nil {
if are, ok := err.(prometheus.AlreadyRegisteredError); ok {
metrics.messagesPublished = are.ExistingCollector.(*prometheus.CounterVec)
}
}
err = registerer.Register(metrics.bytesPublished)
if err != nil {
if are, ok := err.(prometheus.AlreadyRegisteredError); ok {
metrics.bytesPublished = are.ExistingCollector.(*prometheus.CounterVec)
}
}
err = registerer.Register(metrics.messagesPending)
if err != nil {
if are, ok := err.(prometheus.AlreadyRegisteredError); ok {
metrics.messagesPending = are.ExistingCollector.(*prometheus.GaugeVec)
}
}
err = registerer.Register(metrics.bytesPending)
if err != nil {
if are, ok := err.(prometheus.AlreadyRegisteredError); ok {
metrics.bytesPending = are.ExistingCollector.(*prometheus.GaugeVec)
}
}
err = registerer.Register(metrics.publishErrors)
if err != nil {
if are, ok := err.(prometheus.AlreadyRegisteredError); ok {
metrics.publishErrors = are.ExistingCollector.(*prometheus.CounterVec)
}
}
err = registerer.Register(metrics.publishLatency)
if err != nil {
if are, ok := err.(prometheus.AlreadyRegisteredError); ok {
metrics.publishLatency = are.ExistingCollector.(*prometheus.HistogramVec)
}
}
err = registerer.Register(metrics.publishRPCLatency)
if err != nil {
if are, ok := err.(prometheus.AlreadyRegisteredError); ok {
metrics.publishRPCLatency = are.ExistingCollector.(*prometheus.HistogramVec)
}
}
err = registerer.Register(metrics.messagesReceived)
if err != nil {
if are, ok := err.(prometheus.AlreadyRegisteredError); ok {
metrics.messagesReceived = are.ExistingCollector.(*prometheus.CounterVec)
}
}
err = registerer.Register(metrics.bytesReceived)
if err != nil {
if are, ok := err.(prometheus.AlreadyRegisteredError); ok {
metrics.bytesReceived = are.ExistingCollector.(*prometheus.CounterVec)
}
}
err = registerer.Register(metrics.prefetchedMessages)
if err != nil {
if are, ok := err.(prometheus.AlreadyRegisteredError); ok {
metrics.prefetchedMessages = are.ExistingCollector.(*prometheus.GaugeVec)
}
}
err = registerer.Register(metrics.prefetchedBytes)
if err != nil {
if are, ok := err.(prometheus.AlreadyRegisteredError); ok {
metrics.prefetchedBytes = are.ExistingCollector.(*prometheus.GaugeVec)
}
}
err = registerer.Register(metrics.acksCounter)
if err != nil {
if are, ok := err.(prometheus.AlreadyRegisteredError); ok {
metrics.acksCounter = are.ExistingCollector.(*prometheus.CounterVec)
}
}
err = registerer.Register(metrics.nacksCounter)
if err != nil {
if are, ok := err.(prometheus.AlreadyRegisteredError); ok {
metrics.nacksCounter = are.ExistingCollector.(*prometheus.CounterVec)
}
}
err = registerer.Register(metrics.dlqCounter)
if err != nil {
if are, ok := err.(prometheus.AlreadyRegisteredError); ok {
metrics.dlqCounter = are.ExistingCollector.(*prometheus.CounterVec)
}
}
err = registerer.Register(metrics.processingTime)
if err != nil {
if are, ok := err.(prometheus.AlreadyRegisteredError); ok {
metrics.processingTime = are.ExistingCollector.(*prometheus.HistogramVec)
}
}
err = registerer.Register(metrics.producersOpened)
if err != nil {
if are, ok := err.(prometheus.AlreadyRegisteredError); ok {
metrics.producersOpened = are.ExistingCollector.(*prometheus.CounterVec)
}
}
err = registerer.Register(metrics.producersClosed)
if err != nil {
if are, ok := err.(prometheus.AlreadyRegisteredError); ok {
metrics.producersClosed = are.ExistingCollector.(*prometheus.CounterVec)
}
}
err = registerer.Register(metrics.producersReconnectFailure)
if err != nil {
if are, ok := err.(prometheus.AlreadyRegisteredError); ok {
metrics.producersReconnectFailure = are.ExistingCollector.(*prometheus.CounterVec)
}
}
err = registerer.Register(metrics.producersReconnectMaxRetry)
if err != nil {
if are, ok := err.(prometheus.AlreadyRegisteredError); ok {
metrics.producersReconnectMaxRetry = are.ExistingCollector.(*prometheus.CounterVec)
}
}
err = registerer.Register(metrics.producersPartitions)
if err != nil {
if are, ok := err.(prometheus.AlreadyRegisteredError); ok {
metrics.producersPartitions = are.ExistingCollector.(*prometheus.GaugeVec)
}
}
err = registerer.Register(metrics.consumersOpened)
if err != nil {
if are, ok := err.(prometheus.AlreadyRegisteredError); ok {
metrics.consumersOpened = are.ExistingCollector.(*prometheus.CounterVec)
}
}
err = registerer.Register(metrics.consumersClosed)
if err != nil {
if are, ok := err.(prometheus.AlreadyRegisteredError); ok {
metrics.consumersClosed = are.ExistingCollector.(*prometheus.CounterVec)
}
}
err = registerer.Register(metrics.consumersReconnectFailure)
if err != nil {
if are, ok := err.(prometheus.AlreadyRegisteredError); ok {
metrics.consumersReconnectFailure = are.ExistingCollector.(*prometheus.CounterVec)
}
}
err = registerer.Register(metrics.consumersReconnectMaxRetry)
if err != nil {
if are, ok := err.(prometheus.AlreadyRegisteredError); ok {
metrics.consumersReconnectMaxRetry = are.ExistingCollector.(*prometheus.CounterVec)
}
}
err = registerer.Register(metrics.consumersPartitions)
if err != nil {
if are, ok := err.(prometheus.AlreadyRegisteredError); ok {
metrics.consumersPartitions = are.ExistingCollector.(*prometheus.GaugeVec)
}
}
err = registerer.Register(metrics.readersOpened)
if err != nil {
if are, ok := err.(prometheus.AlreadyRegisteredError); ok {
metrics.readersOpened = are.ExistingCollector.(*prometheus.CounterVec)
}
}
err = registerer.Register(metrics.readersClosed)
if err != nil {
if are, ok := err.(prometheus.AlreadyRegisteredError); ok {
metrics.readersClosed = are.ExistingCollector.(*prometheus.CounterVec)
}
}
err = registerer.Register(metrics.ConnectionsOpened)
if err != nil {
if are, ok := err.(prometheus.AlreadyRegisteredError); ok {
metrics.ConnectionsOpened = are.ExistingCollector.(prometheus.Counter)
}
}
err = registerer.Register(metrics.ConnectionsClosed)
if err != nil {
if are, ok := err.(prometheus.AlreadyRegisteredError); ok {
metrics.ConnectionsClosed = are.ExistingCollector.(prometheus.Counter)
}
}
err = registerer.Register(metrics.ConnectionsEstablishmentErrors)
if err != nil {
if are, ok := err.(prometheus.AlreadyRegisteredError); ok {
metrics.ConnectionsEstablishmentErrors = are.ExistingCollector.(prometheus.Counter)
}
}
err = registerer.Register(metrics.ConnectionsHandshakeErrors)
if err != nil {
if are, ok := err.(prometheus.AlreadyRegisteredError); ok {
metrics.ConnectionsHandshakeErrors = are.ExistingCollector.(prometheus.Counter)
}
}
err = registerer.Register(metrics.LookupRequestsCount)
if err != nil {
if are, ok := err.(prometheus.AlreadyRegisteredError); ok {
metrics.LookupRequestsCount = are.ExistingCollector.(prometheus.Counter)
}
}
err = registerer.Register(metrics.PartitionedTopicMetadataRequestsCount)
if err != nil {
if are, ok := err.(prometheus.AlreadyRegisteredError); ok {
metrics.PartitionedTopicMetadataRequestsCount = are.ExistingCollector.(prometheus.Counter)
}
}
err = registerer.Register(metrics.RPCRequestCount)
if err != nil {
if are, ok := err.(prometheus.AlreadyRegisteredError); ok {
metrics.RPCRequestCount = are.ExistingCollector.(prometheus.Counter)
}
}
return metrics
}