in pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/TopicStats.java [148:509]
public static void printTopicStats(PrometheusMetricStreams stream, TopicStats stats,
Optional<CompactorMXBean> compactorMXBean, String cluster, String namespace,
String topic, boolean splitTopicAndPartitionIndexLabel) {
writeMetric(stream, "pulsar_subscriptions_count", stats.subscriptionsCount,
cluster, namespace, topic, splitTopicAndPartitionIndexLabel);
writeMetric(stream, "pulsar_producers_count", stats.producersCount,
cluster, namespace, topic, splitTopicAndPartitionIndexLabel);
writeMetric(stream, "pulsar_consumers_count", stats.consumersCount,
cluster, namespace, topic, splitTopicAndPartitionIndexLabel);
writeMetric(stream, "pulsar_rate_in", stats.rateIn,
cluster, namespace, topic, splitTopicAndPartitionIndexLabel);
writeMetric(stream, "pulsar_rate_out", stats.rateOut,
cluster, namespace, topic, splitTopicAndPartitionIndexLabel);
writeMetric(stream, "pulsar_throughput_in", stats.throughputIn,
cluster, namespace, topic, splitTopicAndPartitionIndexLabel);
writeMetric(stream, "pulsar_throughput_out", stats.throughputOut,
cluster, namespace, topic, splitTopicAndPartitionIndexLabel);
writeMetric(stream, "pulsar_average_msg_size", stats.averageMsgSize,
cluster, namespace, topic, splitTopicAndPartitionIndexLabel);
writeMetric(stream, "pulsar_txn_tb_active_total", stats.ongoingTxnCount,
cluster, namespace, topic, splitTopicAndPartitionIndexLabel);
writeMetric(stream, "pulsar_txn_tb_aborted_total", stats.abortedTxnCount,
cluster, namespace, topic, splitTopicAndPartitionIndexLabel);
writeMetric(stream, "pulsar_txn_tb_committed_total", stats.committedTxnCount,
cluster, namespace, topic, splitTopicAndPartitionIndexLabel);
writeMetric(stream, "pulsar_storage_size", stats.managedLedgerStats.storageSize,
cluster, namespace, topic, splitTopicAndPartitionIndexLabel);
writeMetric(stream, "pulsar_storage_logical_size",
stats.managedLedgerStats.storageLogicalSize, cluster, namespace, topic,
splitTopicAndPartitionIndexLabel);
writeMetric(stream, "pulsar_msg_backlog", stats.msgBacklog,
cluster, namespace, topic, splitTopicAndPartitionIndexLabel);
writeMetric(stream, "pulsar_storage_write_rate", stats.managedLedgerStats.storageWriteRate,
cluster, namespace, topic, splitTopicAndPartitionIndexLabel);
writeMetric(stream, "pulsar_storage_read_rate", stats.managedLedgerStats.storageReadRate,
cluster, namespace, topic, splitTopicAndPartitionIndexLabel);
writeMetric(stream, "pulsar_storage_read_cache_misses_rate",
stats.managedLedgerStats.storageReadCacheMissesRate,
cluster, namespace, topic, splitTopicAndPartitionIndexLabel);
writeMetric(stream, "pulsar_storage_backlog_size", stats.managedLedgerStats.backlogSize,
cluster, namespace, topic, splitTopicAndPartitionIndexLabel);
writeMetric(stream, "pulsar_publish_rate_limit_times", stats.publishRateLimitedTimes,
cluster, namespace, topic, splitTopicAndPartitionIndexLabel);
writeMetric(stream, "pulsar_storage_offloaded_size", stats.managedLedgerStats
.offloadedStorageUsed, cluster, namespace, topic, splitTopicAndPartitionIndexLabel);
writeMetric(stream, "pulsar_storage_backlog_quota_limit", stats.backlogQuotaLimit,
cluster, namespace, topic, splitTopicAndPartitionIndexLabel);
writeMetric(stream, "pulsar_storage_backlog_quota_limit_time", stats.backlogQuotaLimitTime,
cluster, namespace, topic, splitTopicAndPartitionIndexLabel);
writeMetric(stream, "pulsar_storage_backlog_age_seconds", stats.backlogAgeSeconds,
cluster, namespace, topic, splitTopicAndPartitionIndexLabel);
writeBacklogQuotaMetric(stream, "pulsar_storage_backlog_quota_exceeded_evictions_total",
stats.sizeBasedBacklogQuotaExceededEvictionCount, cluster, namespace, topic,
splitTopicAndPartitionIndexLabel, BacklogQuotaType.destination_storage);
writeBacklogQuotaMetric(stream, "pulsar_storage_backlog_quota_exceeded_evictions_total",
stats.timeBasedBacklogQuotaExceededEvictionCount, cluster, namespace, topic,
splitTopicAndPartitionIndexLabel, BacklogQuotaType.message_age);
writeMetric(stream, "pulsar_delayed_message_index_size_bytes", stats.delayedMessageIndexSizeInBytes,
cluster, namespace, topic, splitTopicAndPartitionIndexLabel);
for (TopicMetricBean topicMetricBean : stats.bucketDelayedIndexStats.values()) {
writeTopicMetric(stream, topicMetricBean.name, topicMetricBean.value, cluster, namespace,
topic, splitTopicAndPartitionIndexLabel, topicMetricBean.labelsAndValues);
}
long[] latencyBuckets = stats.managedLedgerStats.storageWriteLatencyBuckets.getBuckets();
writeMetric(stream, "pulsar_storage_write_latency_le_0_5",
latencyBuckets[0], cluster, namespace, topic, splitTopicAndPartitionIndexLabel);
writeMetric(stream, "pulsar_storage_write_latency_le_1",
latencyBuckets[1], cluster, namespace, topic, splitTopicAndPartitionIndexLabel);
writeMetric(stream, "pulsar_storage_write_latency_le_5",
latencyBuckets[2], cluster, namespace, topic, splitTopicAndPartitionIndexLabel);
writeMetric(stream, "pulsar_storage_write_latency_le_10",
latencyBuckets[3], cluster, namespace, topic, splitTopicAndPartitionIndexLabel);
writeMetric(stream, "pulsar_storage_write_latency_le_20",
latencyBuckets[4], cluster, namespace, topic, splitTopicAndPartitionIndexLabel);
writeMetric(stream, "pulsar_storage_write_latency_le_50",
latencyBuckets[5], cluster, namespace, topic, splitTopicAndPartitionIndexLabel);
writeMetric(stream, "pulsar_storage_write_latency_le_100",
latencyBuckets[6], cluster, namespace, topic, splitTopicAndPartitionIndexLabel);
writeMetric(stream, "pulsar_storage_write_latency_le_200",
latencyBuckets[7], cluster, namespace, topic, splitTopicAndPartitionIndexLabel);
writeMetric(stream, "pulsar_storage_write_latency_le_1000",
latencyBuckets[8], cluster, namespace, topic, splitTopicAndPartitionIndexLabel);
writeMetric(stream, "pulsar_storage_write_latency_overflow",
latencyBuckets[9], cluster, namespace, topic, splitTopicAndPartitionIndexLabel);
writeMetric(stream, "pulsar_storage_write_latency_count",
stats.managedLedgerStats.storageWriteLatencyBuckets.getCount(),
cluster, namespace, topic, splitTopicAndPartitionIndexLabel);
writeMetric(stream, "pulsar_storage_write_latency_sum",
stats.managedLedgerStats.storageWriteLatencyBuckets.getSum(), cluster, namespace, topic,
splitTopicAndPartitionIndexLabel);
long[] ledgerWriteLatencyBuckets = stats.managedLedgerStats.storageLedgerWriteLatencyBuckets.getBuckets();
writeMetric(stream, "pulsar_storage_ledger_write_latency_le_0_5",
ledgerWriteLatencyBuckets[0], cluster, namespace, topic, splitTopicAndPartitionIndexLabel);
writeMetric(stream, "pulsar_storage_ledger_write_latency_le_1",
ledgerWriteLatencyBuckets[1], cluster, namespace, topic, splitTopicAndPartitionIndexLabel);
writeMetric(stream, "pulsar_storage_ledger_write_latency_le_5",
ledgerWriteLatencyBuckets[2], cluster, namespace, topic, splitTopicAndPartitionIndexLabel);
writeMetric(stream, "pulsar_storage_ledger_write_latency_le_10",
ledgerWriteLatencyBuckets[3], cluster, namespace, topic, splitTopicAndPartitionIndexLabel);
writeMetric(stream, "pulsar_storage_ledger_write_latency_le_20",
ledgerWriteLatencyBuckets[4], cluster, namespace, topic, splitTopicAndPartitionIndexLabel);
writeMetric(stream, "pulsar_storage_ledger_write_latency_le_50",
ledgerWriteLatencyBuckets[5], cluster, namespace, topic, splitTopicAndPartitionIndexLabel);
writeMetric(stream, "pulsar_storage_ledger_write_latency_le_100",
ledgerWriteLatencyBuckets[6], cluster, namespace, topic, splitTopicAndPartitionIndexLabel);
writeMetric(stream, "pulsar_storage_ledger_write_latency_le_200",
ledgerWriteLatencyBuckets[7], cluster, namespace, topic, splitTopicAndPartitionIndexLabel);
writeMetric(stream, "pulsar_storage_ledger_write_latency_le_1000",
ledgerWriteLatencyBuckets[8], cluster, namespace, topic, splitTopicAndPartitionIndexLabel);
writeMetric(stream, "pulsar_storage_ledger_write_latency_overflow",
ledgerWriteLatencyBuckets[9], cluster, namespace, topic, splitTopicAndPartitionIndexLabel);
writeMetric(stream, "pulsar_storage_ledger_write_latency_count",
stats.managedLedgerStats.storageLedgerWriteLatencyBuckets.getCount(),
cluster, namespace, topic, splitTopicAndPartitionIndexLabel);
writeMetric(stream, "pulsar_storage_ledger_write_latency_sum",
stats.managedLedgerStats.storageLedgerWriteLatencyBuckets.getSum(),
cluster, namespace, topic, splitTopicAndPartitionIndexLabel);
long[] entrySizeBuckets = stats.managedLedgerStats.entrySizeBuckets.getBuckets();
writeMetric(stream, "pulsar_entry_size_le_128", entrySizeBuckets[0], cluster, namespace, topic,
splitTopicAndPartitionIndexLabel);
writeMetric(stream, "pulsar_entry_size_le_512", entrySizeBuckets[1], cluster, namespace, topic,
splitTopicAndPartitionIndexLabel);
writeMetric(stream, "pulsar_entry_size_le_1_kb", entrySizeBuckets[2], cluster, namespace, topic,
splitTopicAndPartitionIndexLabel);
writeMetric(stream, "pulsar_entry_size_le_2_kb", entrySizeBuckets[3], cluster, namespace, topic,
splitTopicAndPartitionIndexLabel);
writeMetric(stream, "pulsar_entry_size_le_4_kb", entrySizeBuckets[4], cluster, namespace, topic,
splitTopicAndPartitionIndexLabel);
writeMetric(stream, "pulsar_entry_size_le_16_kb", entrySizeBuckets[5], cluster, namespace, topic,
splitTopicAndPartitionIndexLabel);
writeMetric(stream, "pulsar_entry_size_le_100_kb", entrySizeBuckets[6], cluster, namespace, topic,
splitTopicAndPartitionIndexLabel);
writeMetric(stream, "pulsar_entry_size_le_1_mb", entrySizeBuckets[7], cluster, namespace, topic,
splitTopicAndPartitionIndexLabel);
writeMetric(stream, "pulsar_entry_size_le_overflow", entrySizeBuckets[8], cluster, namespace, topic,
splitTopicAndPartitionIndexLabel);
writeMetric(stream, "pulsar_entry_size_count", stats.managedLedgerStats.entrySizeBuckets.getCount(),
cluster, namespace, topic, splitTopicAndPartitionIndexLabel);
writeMetric(stream, "pulsar_entry_size_sum", stats.managedLedgerStats.entrySizeBuckets.getSum(),
cluster, namespace, topic, splitTopicAndPartitionIndexLabel);
stats.producerStats.forEach((p, producerStats) -> {
writeProducerMetric(stream, "pulsar_producer_msg_rate_in", producerStats.msgRateIn,
cluster, namespace, topic, p, producerStats.producerId, splitTopicAndPartitionIndexLabel);
writeProducerMetric(stream, "pulsar_producer_msg_throughput_in", producerStats.msgThroughputIn,
cluster, namespace, topic, p, producerStats.producerId, splitTopicAndPartitionIndexLabel);
writeProducerMetric(stream, "pulsar_producer_msg_average_Size", producerStats.averageMsgSize,
cluster, namespace, topic, p, producerStats.producerId, splitTopicAndPartitionIndexLabel);
});
stats.subscriptionStats.forEach((sub, subsStats) -> {
writeSubscriptionMetric(stream, "pulsar_subscription_back_log", subsStats.msgBacklog,
cluster, namespace, topic, sub, splitTopicAndPartitionIndexLabel);
writeSubscriptionMetric(stream, "pulsar_subscription_back_log_no_delayed",
subsStats.msgBacklogNoDelayed, cluster, namespace, topic, sub, splitTopicAndPartitionIndexLabel);
writeSubscriptionMetric(stream, "pulsar_subscription_delayed",
subsStats.msgDelayed, cluster, namespace, topic, sub, splitTopicAndPartitionIndexLabel);
writeSubscriptionMetric(stream, "pulsar_subscription_in_replay",
subsStats.msgInReplay, cluster, namespace, topic, sub, splitTopicAndPartitionIndexLabel);
writeSubscriptionMetric(stream, "pulsar_subscription_msg_rate_redeliver",
subsStats.msgRateRedeliver, cluster, namespace, topic, sub, splitTopicAndPartitionIndexLabel);
writeSubscriptionMetric(stream, "pulsar_subscription_unacked_messages",
subsStats.unackedMessages, cluster, namespace, topic, sub, splitTopicAndPartitionIndexLabel);
writeSubscriptionMetric(stream, "pulsar_subscription_blocked_on_unacked_messages",
subsStats.blockedSubscriptionOnUnackedMsgs ? 1 : 0, cluster, namespace, topic, sub,
splitTopicAndPartitionIndexLabel);
writeSubscriptionMetric(stream, "pulsar_subscription_msg_rate_out",
subsStats.msgRateOut, cluster, namespace, topic, sub, splitTopicAndPartitionIndexLabel);
writeSubscriptionMetric(stream, "pulsar_subscription_msg_ack_rate",
subsStats.messageAckRate, cluster, namespace, topic, sub, splitTopicAndPartitionIndexLabel);
writeSubscriptionMetric(stream, "pulsar_subscription_msg_throughput_out",
subsStats.msgThroughputOut, cluster, namespace, topic, sub, splitTopicAndPartitionIndexLabel);
writeSubscriptionMetric(stream, "pulsar_out_bytes_total",
subsStats.bytesOutCounter, cluster, namespace, topic, sub, splitTopicAndPartitionIndexLabel);
writeSubscriptionMetric(stream, "pulsar_out_messages_total",
subsStats.msgOutCounter, cluster, namespace, topic, sub, splitTopicAndPartitionIndexLabel);
writeSubscriptionMetric(stream, "pulsar_subscription_last_expire_timestamp",
subsStats.lastExpireTimestamp, cluster, namespace, topic, sub, splitTopicAndPartitionIndexLabel);
writeSubscriptionMetric(stream, "pulsar_subscription_last_acked_timestamp",
subsStats.lastAckedTimestamp, cluster, namespace, topic, sub, splitTopicAndPartitionIndexLabel);
writeSubscriptionMetric(stream, "pulsar_subscription_last_consumed_flow_timestamp",
subsStats.lastConsumedFlowTimestamp, cluster, namespace, topic, sub,
splitTopicAndPartitionIndexLabel);
writeSubscriptionMetric(stream, "pulsar_subscription_last_consumed_timestamp",
subsStats.lastConsumedTimestamp, cluster, namespace, topic, sub, splitTopicAndPartitionIndexLabel);
writeSubscriptionMetric(stream, "pulsar_subscription_last_mark_delete_advanced_timestamp",
subsStats.lastMarkDeleteAdvancedTimestamp, cluster, namespace, topic, sub,
splitTopicAndPartitionIndexLabel);
writeSubscriptionMetric(stream, "pulsar_subscription_msg_rate_expired",
subsStats.msgRateExpired, cluster, namespace, topic, sub, splitTopicAndPartitionIndexLabel);
writeSubscriptionMetric(stream, "pulsar_subscription_total_msg_expired",
subsStats.totalMsgExpired, cluster, namespace, topic, sub, splitTopicAndPartitionIndexLabel);
writeSubscriptionMetric(stream, "pulsar_subscription_msg_drop_rate",
subsStats.msgDropRate, cluster, namespace, topic, sub, splitTopicAndPartitionIndexLabel);
writeSubscriptionMetric(stream, "pulsar_subscription_consumers_count",
subsStats.consumersCount, cluster, namespace, topic, sub, splitTopicAndPartitionIndexLabel);
writeSubscriptionMetric(stream, "pulsar_subscription_filter_processed_msg_count",
subsStats.filterProcessedMsgCount, cluster, namespace, topic, sub,
splitTopicAndPartitionIndexLabel);
writeSubscriptionMetric(stream, "pulsar_subscription_filter_accepted_msg_count",
subsStats.filterAcceptedMsgCount, cluster, namespace, topic, sub, splitTopicAndPartitionIndexLabel);
writeSubscriptionMetric(stream, "pulsar_subscription_filter_rejected_msg_count",
subsStats.filterRejectedMsgCount, cluster, namespace, topic, sub, splitTopicAndPartitionIndexLabel);
writeSubscriptionMetric(stream, "pulsar_subscription_filter_rescheduled_msg_count",
subsStats.filterRescheduledMsgCount, cluster, namespace, topic, sub,
splitTopicAndPartitionIndexLabel);
writeSubscriptionMetric(stream, "pulsar_subscription_delayed_message_index_size_bytes",
subsStats.delayedMessageIndexSizeInBytes, cluster, namespace, topic, sub,
splitTopicAndPartitionIndexLabel);
// write dispatch throttling metrics with `reason` labels to identify specific throttling
// causes: by subscription limit, by topic limit, or by broker limit.
writeTopicMetric(stream, "pulsar_subscription_dispatch_throttled_msg_events",
subsStats.dispatchThrottledMsgEventsBySubscriptionLimit, cluster, namespace, topic,
splitTopicAndPartitionIndexLabel, "subscription", sub,
"reason", "subscription");
writeTopicMetric(stream, "pulsar_subscription_dispatch_throttled_bytes_events",
subsStats.dispatchThrottledBytesEventsBySubscriptionLimit, cluster, namespace, topic,
splitTopicAndPartitionIndexLabel, "subscription", sub,
"reason", "subscription");
writeTopicMetric(stream, "pulsar_subscription_dispatch_throttled_msg_events",
subsStats.dispatchThrottledMsgEventsByTopicLimit, cluster, namespace, topic,
splitTopicAndPartitionIndexLabel, "subscription", sub,
"reason", "topic");
writeTopicMetric(stream, "pulsar_subscription_dispatch_throttled_bytes_events",
subsStats.dispatchThrottledBytesEventsByTopicLimit, cluster, namespace, topic,
splitTopicAndPartitionIndexLabel, "subscription", sub,
"reason", "topic");
writeTopicMetric(stream, "pulsar_subscription_dispatch_throttled_msg_events",
subsStats.dispatchThrottledMsgEventsByBrokerLimit, cluster, namespace, topic,
splitTopicAndPartitionIndexLabel, "subscription", sub,
"reason", "broker");
writeTopicMetric(stream, "pulsar_subscription_dispatch_throttled_bytes_events",
subsStats.dispatchThrottledBytesEventsByBrokerLimit, cluster, namespace, topic,
splitTopicAndPartitionIndexLabel, "subscription", sub,
"reason", "broker");
final String[] subscriptionLabel = {"subscription", sub};
for (TopicMetricBean topicMetricBean : subsStats.bucketDelayedIndexStats.values()) {
String[] labelsAndValues = ArrayUtils.addAll(subscriptionLabel, topicMetricBean.labelsAndValues);
writeTopicMetric(stream, topicMetricBean.name, topicMetricBean.value, cluster, namespace,
topic, splitTopicAndPartitionIndexLabel, labelsAndValues);
}
subsStats.consumerStat.forEach((c, consumerStats) -> {
writeConsumerMetric(stream, "pulsar_consumer_msg_rate_redeliver", consumerStats.msgRateRedeliver,
cluster, namespace, topic, sub, c, splitTopicAndPartitionIndexLabel);
writeConsumerMetric(stream, "pulsar_consumer_unacked_messages", consumerStats.unackedMessages,
cluster, namespace, topic, sub, c, splitTopicAndPartitionIndexLabel);
writeConsumerMetric(stream, "pulsar_consumer_blocked_on_unacked_messages",
consumerStats.blockedSubscriptionOnUnackedMsgs ? 1 : 0,
cluster, namespace, topic, sub, c, splitTopicAndPartitionIndexLabel);
writeConsumerMetric(stream, "pulsar_consumer_msg_rate_out", consumerStats.msgRateOut,
cluster, namespace, topic, sub, c, splitTopicAndPartitionIndexLabel);
writeConsumerMetric(stream, "pulsar_consumer_msg_ack_rate", consumerStats.msgAckRate,
cluster, namespace, topic, sub, c, splitTopicAndPartitionIndexLabel);
writeConsumerMetric(stream, "pulsar_consumer_msg_throughput_out", consumerStats.msgThroughputOut,
cluster, namespace, topic, sub, c, splitTopicAndPartitionIndexLabel);
writeConsumerMetric(stream, "pulsar_consumer_available_permits", consumerStats.availablePermits,
cluster, namespace, topic, sub, c, splitTopicAndPartitionIndexLabel);
writeConsumerMetric(stream, "pulsar_out_bytes_total", consumerStats.bytesOutCounter,
cluster, namespace, topic, sub, c, splitTopicAndPartitionIndexLabel);
writeConsumerMetric(stream, "pulsar_out_messages_total", consumerStats.msgOutCounter,
cluster, namespace, topic, sub, c, splitTopicAndPartitionIndexLabel);
});
});
if (!stats.replicationStats.isEmpty()) {
stats.replicationStats.forEach((remoteCluster, replStats) -> {
writeMetric(stream, "pulsar_replication_rate_in", replStats.msgRateIn,
cluster, namespace, topic, remoteCluster, splitTopicAndPartitionIndexLabel);
writeMetric(stream, "pulsar_replication_rate_out", replStats.msgRateOut,
cluster, namespace, topic, remoteCluster, splitTopicAndPartitionIndexLabel);
writeMetric(stream, "pulsar_replication_throughput_in", replStats.msgThroughputIn,
cluster, namespace, topic, remoteCluster, splitTopicAndPartitionIndexLabel);
writeMetric(stream, "pulsar_replication_throughput_out", replStats.msgThroughputOut,
cluster, namespace, topic, remoteCluster, splitTopicAndPartitionIndexLabel);
writeMetric(stream, "pulsar_replication_backlog", replStats.replicationBacklog,
cluster, namespace, topic, remoteCluster, splitTopicAndPartitionIndexLabel);
writeMetric(stream, "pulsar_replication_connected_count", replStats.connectedCount,
cluster, namespace, topic, remoteCluster, splitTopicAndPartitionIndexLabel);
writeMetric(stream, "pulsar_replication_disconnected_count", replStats.disconnectedCount,
cluster, namespace, topic, remoteCluster, splitTopicAndPartitionIndexLabel);
writeMetric(stream, "pulsar_replication_rate_expired", replStats.msgRateExpired,
cluster, namespace, topic, remoteCluster, splitTopicAndPartitionIndexLabel);
writeMetric(stream, "pulsar_replication_delay_in_seconds", replStats.replicationDelayInSeconds,
cluster, namespace, topic, remoteCluster, splitTopicAndPartitionIndexLabel);
});
}
writeMetric(stream, "pulsar_in_bytes_total", stats.bytesInCounter, cluster, namespace, topic,
splitTopicAndPartitionIndexLabel);
writeMetric(stream, "pulsar_in_messages_total", stats.msgInCounter, cluster, namespace, topic,
splitTopicAndPartitionIndexLabel);
// Compaction
boolean hasCompaction = compactorMXBean.flatMap(mxBean -> mxBean.getCompactionRecordForTopic(topic))
.isPresent();
if (hasCompaction) {
writeMetric(stream, "pulsar_compaction_removed_event_count",
stats.compactionRemovedEventCount, cluster, namespace, topic, splitTopicAndPartitionIndexLabel);
writeMetric(stream, "pulsar_compaction_succeed_count",
stats.compactionSucceedCount, cluster, namespace, topic, splitTopicAndPartitionIndexLabel);
writeMetric(stream, "pulsar_compaction_failed_count",
stats.compactionFailedCount, cluster, namespace, topic, splitTopicAndPartitionIndexLabel);
writeMetric(stream, "pulsar_compaction_duration_time_in_mills",
stats.compactionDurationTimeInMills, cluster, namespace, topic, splitTopicAndPartitionIndexLabel);
writeMetric(stream, "pulsar_compaction_read_throughput",
stats.compactionReadThroughput, cluster, namespace, topic, splitTopicAndPartitionIndexLabel);
writeMetric(stream, "pulsar_compaction_write_throughput",
stats.compactionWriteThroughput, cluster, namespace, topic, splitTopicAndPartitionIndexLabel);
writeMetric(stream, "pulsar_compaction_compacted_entries_count",
stats.compactionCompactedEntriesCount, cluster, namespace, topic, splitTopicAndPartitionIndexLabel);
writeMetric(stream, "pulsar_compaction_compacted_entries_size",
stats.compactionCompactedEntriesSize, cluster, namespace, topic, splitTopicAndPartitionIndexLabel);
long[] compactionBuckets = stats.compactionLatencyBuckets.getBuckets();
writeMetric(stream, "pulsar_compaction_latency_le_0_5",
compactionBuckets[0], cluster, namespace, topic, splitTopicAndPartitionIndexLabel);
writeMetric(stream, "pulsar_compaction_latency_le_1",
compactionBuckets[1], cluster, namespace, topic, splitTopicAndPartitionIndexLabel);
writeMetric(stream, "pulsar_compaction_latency_le_5",
compactionBuckets[2], cluster, namespace, topic, splitTopicAndPartitionIndexLabel);
writeMetric(stream, "pulsar_compaction_latency_le_10",
compactionBuckets[3], cluster, namespace, topic, splitTopicAndPartitionIndexLabel);
writeMetric(stream, "pulsar_compaction_latency_le_20",
compactionBuckets[4], cluster, namespace, topic, splitTopicAndPartitionIndexLabel);
writeMetric(stream, "pulsar_compaction_latency_le_50",
compactionBuckets[5], cluster, namespace, topic, splitTopicAndPartitionIndexLabel);
writeMetric(stream, "pulsar_compaction_latency_le_100",
compactionBuckets[6], cluster, namespace, topic, splitTopicAndPartitionIndexLabel);
writeMetric(stream, "pulsar_compaction_latency_le_200",
compactionBuckets[7], cluster, namespace, topic, splitTopicAndPartitionIndexLabel);
writeMetric(stream, "pulsar_compaction_latency_le_1000",
compactionBuckets[8], cluster, namespace, topic, splitTopicAndPartitionIndexLabel);
writeMetric(stream, "pulsar_compaction_latency_overflow",
compactionBuckets[9], cluster, namespace, topic, splitTopicAndPartitionIndexLabel);
writeMetric(stream, "pulsar_compaction_latency_sum",
stats.compactionLatencyBuckets.getSum(), cluster, namespace, topic,
splitTopicAndPartitionIndexLabel);
writeMetric(stream, "pulsar_compaction_latency_count",
stats.compactionLatencyBuckets.getCount(), cluster, namespace, topic,
splitTopicAndPartitionIndexLabel);
for (TopicMetricBean topicMetricBean : stats.bucketDelayedIndexStats.values()) {
String[] labelsAndValues = topicMetricBean.labelsAndValues;
writeTopicMetric(stream, topicMetricBean.name, topicMetricBean.value, cluster, namespace,
topic, splitTopicAndPartitionIndexLabel, labelsAndValues);
}
}
}