public OpenTelemetryTopicStats()

in pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/OpenTelemetryTopicStats.java [178:388]


    public OpenTelemetryTopicStats(PulsarService pulsar) {
        this.pulsar = pulsar;
        var meter = pulsar.getOpenTelemetry().getMeter();

        subscriptionCounter = meter
                .upDownCounterBuilder(SUBSCRIPTION_COUNTER)
                .setUnit("{subscription}")
                .setDescription("The number of Pulsar subscriptions of the topic served by this broker.")
                .buildObserver();

        producerCounter = meter
                .upDownCounterBuilder(PRODUCER_COUNTER)
                .setUnit("{producer}")
                .setDescription("The number of active producers of the topic connected to this broker.")
                .buildObserver();

        consumerCounter = meter
                .upDownCounterBuilder(CONSUMER_COUNTER)
                .setUnit("{consumer}")
                .setDescription("The number of active consumers of the topic connected to this broker.")
                .buildObserver();

        messageInCounter = meter
                .counterBuilder(MESSAGE_IN_COUNTER)
                .setUnit("{message}")
                .setDescription("The total number of messages received for this topic.")
                .buildObserver();

        messageOutCounter = meter
                .counterBuilder(MESSAGE_OUT_COUNTER)
                .setUnit("{message}")
                .setDescription("The total number of messages read from this topic.")
                .buildObserver();

        bytesInCounter = meter
                .counterBuilder(BYTES_IN_COUNTER)
                .setUnit("By")
                .setDescription("The total number of messages bytes received for this topic.")
                .buildObserver();

        bytesOutCounter = meter
                .counterBuilder(BYTES_OUT_COUNTER)
                .setUnit("By")
                .setDescription("The total number of messages bytes read from this topic.")
                .buildObserver();

        publishRateLimitHitCounter = meter
                .counterBuilder(PUBLISH_RATE_LIMIT_HIT_COUNTER)
                .setUnit("{event}")
                .setDescription("The number of times the publish rate limit is triggered.")
                .buildObserver();

        storageCounter = meter
                .upDownCounterBuilder(STORAGE_COUNTER)
                .setUnit("By")
                .setDescription(
                        "The total storage size of the messages in this topic, including storage used by replicas.")
                .buildObserver();

        storageLogicalCounter = meter
                .upDownCounterBuilder(STORAGE_LOGICAL_COUNTER)
                .setUnit("By")
                .setDescription("The storage size of the messages in this topic, excluding storage used by replicas.")
                .buildObserver();

        storageBacklogCounter = meter
                .upDownCounterBuilder(STORAGE_BACKLOG_COUNTER)
                .setUnit("By")
                .setDescription("The size of the backlog storage for this topic.")
                .buildObserver();

        storageOffloadedCounter = meter
                .upDownCounterBuilder(STORAGE_OFFLOADED_COUNTER)
                .setUnit("By")
                .setDescription("The total amount of the data in this topic offloaded to the tiered storage.")
                .buildObserver();

        backlogQuotaLimitSize = meter
                .upDownCounterBuilder(BACKLOG_QUOTA_LIMIT_SIZE)
                .setUnit("By")
                .setDescription("The size based backlog quota limit for this topic.")
                .buildObserver();

        backlogQuotaLimitTime = meter
                .gaugeBuilder(BACKLOG_QUOTA_LIMIT_TIME)
                .ofLongs()
                .setUnit("s")
                .setDescription("The time based backlog quota limit for this topic.")
                .buildObserver();

        backlogEvictionCounter = meter
                .counterBuilder(BACKLOG_EVICTION_COUNTER)
                .setUnit("{eviction}")
                .setDescription("The number of times a backlog was evicted since it has exceeded its quota.")
                .buildObserver();

        backlogQuotaAge = meter
                .gaugeBuilder(BACKLOG_QUOTA_AGE)
                .ofLongs()
                .setUnit("s")
                .setDescription("The age of the oldest unacknowledged message (backlog).")
                .buildObserver();

        storageOutCounter = meter
                .counterBuilder(STORAGE_OUT_COUNTER)
                .setUnit("{entry}")
                .setDescription("The total message batches (entries) written to the storage for this topic.")
                .buildObserver();

        storageInCounter = meter
                .counterBuilder(STORAGE_IN_COUNTER)
                .setUnit("{entry}")
                .setDescription("The total message batches (entries) read from the storage for this topic.")
                .buildObserver();

        compactionRemovedCounter = meter
                .counterBuilder(COMPACTION_REMOVED_COUNTER)
                .setUnit("{message}")
                .setDescription("The total number of messages removed by compaction.")
                .buildObserver();

        compactionOperationCounter = meter
                .counterBuilder(COMPACTION_OPERATION_COUNTER)
                .setUnit("{operation}")
                .setDescription("The total number of compaction operations.")
                .buildObserver();

        compactionDurationSeconds = meter
                .upDownCounterBuilder(COMPACTION_DURATION_SECONDS)
                .ofDoubles()
                .setUnit("s")
                .setDescription("The total time duration of compaction operations on the topic.")
                .buildObserver();

        compactionBytesInCounter = meter
                .counterBuilder(COMPACTION_BYTES_IN_COUNTER)
                .setUnit("By")
                .setDescription("The total count of bytes read by the compaction process for this topic.")
                .buildObserver();

        compactionBytesOutCounter = meter
                .counterBuilder(COMPACTION_BYTES_OUT_COUNTER)
                .setUnit("By")
                .setDescription("The total count of bytes written by the compaction process for this topic.")
                .buildObserver();

        compactionEntriesCounter = meter
                .counterBuilder(COMPACTION_ENTRIES_COUNTER)
                .setUnit("{entry}")
                .setDescription("The total number of compacted entries.")
                .buildObserver();

        compactionBytesCounter = meter
                .counterBuilder(COMPACTION_BYTES_COUNTER)
                .setUnit("By")
                .setDescription("The total size of the compacted entries.")
                .buildObserver();

        transactionCounter = meter
                .upDownCounterBuilder(TRANSACTION_COUNTER)
                .setUnit("{transaction}")
                .setDescription("The number of transactions on this topic.")
                .buildObserver();

        transactionBufferClientOperationCounter = meter
                .counterBuilder(TRANSACTION_BUFFER_CLIENT_OPERATION_COUNTER)
                .setUnit("{operation}")
                .setDescription("The number of operations on the transaction buffer client.")
                .buildObserver();

        delayedSubscriptionCounter = meter
                .upDownCounterBuilder(DELAYED_SUBSCRIPTION_COUNTER)
                .setUnit("{entry}")
                .setDescription("The total number of message batches (entries) delayed for dispatching.")
                .buildObserver();

        batchCallback = meter.batchCallback(() -> pulsar.getBrokerService()
                        .getTopics()
                        .values()
                        .stream()
                        .map(topicFuture -> topicFuture.getNow(Optional.empty()))
                        .forEach(topic -> topic.ifPresent(this::recordMetricsForTopic)),
                subscriptionCounter,
                producerCounter,
                consumerCounter,
                messageInCounter,
                messageOutCounter,
                bytesInCounter,
                bytesOutCounter,
                publishRateLimitHitCounter,
                storageCounter,
                storageLogicalCounter,
                storageBacklogCounter,
                storageOffloadedCounter,
                backlogQuotaLimitSize,
                backlogQuotaLimitTime,
                backlogEvictionCounter,
                backlogQuotaAge,
                storageOutCounter,
                storageInCounter,
                compactionRemovedCounter,
                compactionOperationCounter,
                compactionDurationSeconds,
                compactionBytesInCounter,
                compactionBytesOutCounter,
                compactionEntriesCounter,
                compactionBytesCounter,
                transactionCounter,
                transactionBufferClientOperationCounter,
                delayedSubscriptionCounter);
    }