in pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/OpenTelemetryTopicStats.java [178:388]
public OpenTelemetryTopicStats(PulsarService pulsar) {
this.pulsar = pulsar;
var meter = pulsar.getOpenTelemetry().getMeter();
subscriptionCounter = meter
.upDownCounterBuilder(SUBSCRIPTION_COUNTER)
.setUnit("{subscription}")
.setDescription("The number of Pulsar subscriptions of the topic served by this broker.")
.buildObserver();
producerCounter = meter
.upDownCounterBuilder(PRODUCER_COUNTER)
.setUnit("{producer}")
.setDescription("The number of active producers of the topic connected to this broker.")
.buildObserver();
consumerCounter = meter
.upDownCounterBuilder(CONSUMER_COUNTER)
.setUnit("{consumer}")
.setDescription("The number of active consumers of the topic connected to this broker.")
.buildObserver();
messageInCounter = meter
.counterBuilder(MESSAGE_IN_COUNTER)
.setUnit("{message}")
.setDescription("The total number of messages received for this topic.")
.buildObserver();
messageOutCounter = meter
.counterBuilder(MESSAGE_OUT_COUNTER)
.setUnit("{message}")
.setDescription("The total number of messages read from this topic.")
.buildObserver();
bytesInCounter = meter
.counterBuilder(BYTES_IN_COUNTER)
.setUnit("By")
.setDescription("The total number of messages bytes received for this topic.")
.buildObserver();
bytesOutCounter = meter
.counterBuilder(BYTES_OUT_COUNTER)
.setUnit("By")
.setDescription("The total number of messages bytes read from this topic.")
.buildObserver();
publishRateLimitHitCounter = meter
.counterBuilder(PUBLISH_RATE_LIMIT_HIT_COUNTER)
.setUnit("{event}")
.setDescription("The number of times the publish rate limit is triggered.")
.buildObserver();
storageCounter = meter
.upDownCounterBuilder(STORAGE_COUNTER)
.setUnit("By")
.setDescription(
"The total storage size of the messages in this topic, including storage used by replicas.")
.buildObserver();
storageLogicalCounter = meter
.upDownCounterBuilder(STORAGE_LOGICAL_COUNTER)
.setUnit("By")
.setDescription("The storage size of the messages in this topic, excluding storage used by replicas.")
.buildObserver();
storageBacklogCounter = meter
.upDownCounterBuilder(STORAGE_BACKLOG_COUNTER)
.setUnit("By")
.setDescription("The size of the backlog storage for this topic.")
.buildObserver();
storageOffloadedCounter = meter
.upDownCounterBuilder(STORAGE_OFFLOADED_COUNTER)
.setUnit("By")
.setDescription("The total amount of the data in this topic offloaded to the tiered storage.")
.buildObserver();
backlogQuotaLimitSize = meter
.upDownCounterBuilder(BACKLOG_QUOTA_LIMIT_SIZE)
.setUnit("By")
.setDescription("The size based backlog quota limit for this topic.")
.buildObserver();
backlogQuotaLimitTime = meter
.gaugeBuilder(BACKLOG_QUOTA_LIMIT_TIME)
.ofLongs()
.setUnit("s")
.setDescription("The time based backlog quota limit for this topic.")
.buildObserver();
backlogEvictionCounter = meter
.counterBuilder(BACKLOG_EVICTION_COUNTER)
.setUnit("{eviction}")
.setDescription("The number of times a backlog was evicted since it has exceeded its quota.")
.buildObserver();
backlogQuotaAge = meter
.gaugeBuilder(BACKLOG_QUOTA_AGE)
.ofLongs()
.setUnit("s")
.setDescription("The age of the oldest unacknowledged message (backlog).")
.buildObserver();
storageOutCounter = meter
.counterBuilder(STORAGE_OUT_COUNTER)
.setUnit("{entry}")
.setDescription("The total message batches (entries) written to the storage for this topic.")
.buildObserver();
storageInCounter = meter
.counterBuilder(STORAGE_IN_COUNTER)
.setUnit("{entry}")
.setDescription("The total message batches (entries) read from the storage for this topic.")
.buildObserver();
compactionRemovedCounter = meter
.counterBuilder(COMPACTION_REMOVED_COUNTER)
.setUnit("{message}")
.setDescription("The total number of messages removed by compaction.")
.buildObserver();
compactionOperationCounter = meter
.counterBuilder(COMPACTION_OPERATION_COUNTER)
.setUnit("{operation}")
.setDescription("The total number of compaction operations.")
.buildObserver();
compactionDurationSeconds = meter
.upDownCounterBuilder(COMPACTION_DURATION_SECONDS)
.ofDoubles()
.setUnit("s")
.setDescription("The total time duration of compaction operations on the topic.")
.buildObserver();
compactionBytesInCounter = meter
.counterBuilder(COMPACTION_BYTES_IN_COUNTER)
.setUnit("By")
.setDescription("The total count of bytes read by the compaction process for this topic.")
.buildObserver();
compactionBytesOutCounter = meter
.counterBuilder(COMPACTION_BYTES_OUT_COUNTER)
.setUnit("By")
.setDescription("The total count of bytes written by the compaction process for this topic.")
.buildObserver();
compactionEntriesCounter = meter
.counterBuilder(COMPACTION_ENTRIES_COUNTER)
.setUnit("{entry}")
.setDescription("The total number of compacted entries.")
.buildObserver();
compactionBytesCounter = meter
.counterBuilder(COMPACTION_BYTES_COUNTER)
.setUnit("By")
.setDescription("The total size of the compacted entries.")
.buildObserver();
transactionCounter = meter
.upDownCounterBuilder(TRANSACTION_COUNTER)
.setUnit("{transaction}")
.setDescription("The number of transactions on this topic.")
.buildObserver();
transactionBufferClientOperationCounter = meter
.counterBuilder(TRANSACTION_BUFFER_CLIENT_OPERATION_COUNTER)
.setUnit("{operation}")
.setDescription("The number of operations on the transaction buffer client.")
.buildObserver();
delayedSubscriptionCounter = meter
.upDownCounterBuilder(DELAYED_SUBSCRIPTION_COUNTER)
.setUnit("{entry}")
.setDescription("The total number of message batches (entries) delayed for dispatching.")
.buildObserver();
batchCallback = meter.batchCallback(() -> pulsar.getBrokerService()
.getTopics()
.values()
.stream()
.map(topicFuture -> topicFuture.getNow(Optional.empty()))
.forEach(topic -> topic.ifPresent(this::recordMetricsForTopic)),
subscriptionCounter,
producerCounter,
consumerCounter,
messageInCounter,
messageOutCounter,
bytesInCounter,
bytesOutCounter,
publishRateLimitHitCounter,
storageCounter,
storageLogicalCounter,
storageBacklogCounter,
storageOffloadedCounter,
backlogQuotaLimitSize,
backlogQuotaLimitTime,
backlogEvictionCounter,
backlogQuotaAge,
storageOutCounter,
storageInCounter,
compactionRemovedCounter,
compactionOperationCounter,
compactionDurationSeconds,
compactionBytesInCounter,
compactionBytesOutCounter,
compactionEntriesCounter,
compactionBytesCounter,
transactionCounter,
transactionBufferClientOperationCounter,
delayedSubscriptionCounter);
}