public static void init()

in tieredstore/src/main/java/org/apache/rocketmq/tieredstore/metrics/TieredStoreMetricsManager.java [146:335]


    public static void init(Meter meter, Supplier<AttributesBuilder> attributesBuilderSupplier,
        MessageStoreConfig storeConfig, MessageStoreFetcher fetcher,
        FlatFileStore flatFileStore, MessageStore next) {

        TieredStoreMetricsManager.attributesBuilderSupplier = attributesBuilderSupplier;

        apiLatency = meter.histogramBuilder(HISTOGRAM_API_LATENCY)
            .setDescription("Tiered store rpc latency")
            .setUnit("milliseconds")
            .ofLongs()
            .build();

        providerRpcLatency = meter.histogramBuilder(HISTOGRAM_PROVIDER_RPC_LATENCY)
            .setDescription("Tiered store rpc latency")
            .setUnit("milliseconds")
            .ofLongs()
            .build();

        uploadBytes = meter.histogramBuilder(HISTOGRAM_UPLOAD_BYTES)
            .setDescription("Tiered store upload buffer size")
            .setUnit("bytes")
            .ofLongs()
            .build();

        downloadBytes = meter.histogramBuilder(HISTOGRAM_DOWNLOAD_BYTES)
            .setDescription("Tiered store download buffer size")
            .setUnit("bytes")
            .ofLongs()
            .build();

        dispatchBehind = meter.gaugeBuilder(GAUGE_DISPATCH_BEHIND)
            .setDescription("Tiered store dispatch behind message count")
            .ofLongs()
            .buildWithCallback(measurement -> {
                for (FlatMessageFile flatFile : flatFileStore.deepCopyFlatFileToList()) {
                    try {

                        MessageQueue mq = flatFile.getMessageQueue();
                        long maxOffset = next.getMaxOffsetInQueue(mq.getTopic(), mq.getQueueId());
                        long maxTimestamp = next.getMessageStoreTimeStamp(mq.getTopic(), mq.getQueueId(), maxOffset - 1);
                        if (maxTimestamp > 0 && System.currentTimeMillis() - maxTimestamp > TimeUnit.HOURS.toMillis(flatFile.getFileReservedHours())) {
                            continue;
                        }

                        Attributes commitLogAttributes = newAttributesBuilder()
                            .put(LABEL_TOPIC, mq.getTopic())
                            .put(LABEL_QUEUE_ID, mq.getQueueId())
                            .put(LABEL_FILE_TYPE, FileSegmentType.COMMIT_LOG.name().toLowerCase())
                            .build();

                        Attributes consumeQueueAttributes = newAttributesBuilder()
                            .put(LABEL_TOPIC, mq.getTopic())
                            .put(LABEL_QUEUE_ID, mq.getQueueId())
                            .put(LABEL_FILE_TYPE, FileSegmentType.CONSUME_QUEUE.name().toLowerCase())
                            .build();
                        measurement.record(Math.max(maxOffset - flatFile.getConsumeQueueMaxOffset(), 0), consumeQueueAttributes);
                    } catch (ConsumeQueueException e) {
                        // TODO: handle exception here
                    }
                }
            });

        dispatchLatency = meter.gaugeBuilder(GAUGE_DISPATCH_LATENCY)
            .setDescription("Tiered store dispatch latency")
            .setUnit("seconds")
            .ofLongs()
            .buildWithCallback(measurement -> {
                for (FlatMessageFile flatFile : flatFileStore.deepCopyFlatFileToList()) {
                    try {
                        MessageQueue mq = flatFile.getMessageQueue();

                        long maxOffset = next.getMaxOffsetInQueue(mq.getTopic(), mq.getQueueId());
                        long maxTimestamp = next.getMessageStoreTimeStamp(mq.getTopic(), mq.getQueueId(), maxOffset - 1);
                        if (maxTimestamp > 0 && System.currentTimeMillis() - maxTimestamp > TimeUnit.HOURS.toMillis(flatFile.getFileReservedHours())) {
                            continue;
                        }

                        Attributes commitLogAttributes = newAttributesBuilder()
                            .put(LABEL_TOPIC, mq.getTopic())
                            .put(LABEL_QUEUE_ID, mq.getQueueId())
                            .put(LABEL_FILE_TYPE, FileSegmentType.COMMIT_LOG.name().toLowerCase())
                            .build();

                        Attributes consumeQueueAttributes = newAttributesBuilder()
                            .put(LABEL_TOPIC, mq.getTopic())
                            .put(LABEL_QUEUE_ID, mq.getQueueId())
                            .put(LABEL_FILE_TYPE, FileSegmentType.CONSUME_QUEUE.name().toLowerCase())
                            .build();
                        long consumeQueueDispatchOffset = flatFile.getConsumeQueueMaxOffset();
                        long consumeQueueDispatchLatency = next.getMessageStoreTimeStamp(mq.getTopic(), mq.getQueueId(), consumeQueueDispatchOffset);
                        if (maxOffset <= consumeQueueDispatchOffset || consumeQueueDispatchLatency < 0) {
                            measurement.record(0, consumeQueueAttributes);
                        } else {
                            measurement.record(System.currentTimeMillis() - consumeQueueDispatchLatency, consumeQueueAttributes);
                        }
                    } catch (ConsumeQueueException e) {
                        // TODO: handle exception
                    }
                }
            });

        messagesDispatchTotal = meter.counterBuilder(COUNTER_MESSAGES_DISPATCH_TOTAL)
            .setDescription("Total number of dispatch messages")
            .build();

        messagesOutTotal = meter.counterBuilder(COUNTER_MESSAGES_OUT_TOTAL)
            .setDescription("Total number of outgoing messages")
            .build();

        fallbackTotal = meter.counterBuilder(COUNTER_GET_MESSAGE_FALLBACK_TOTAL)
            .setDescription("Total times of fallback to next store when getting message")
            .build();

        cacheCount = meter.gaugeBuilder(GAUGE_CACHE_COUNT)
            .setDescription("Tiered store cache message count")
            .ofLongs()
            .buildWithCallback(measurement -> {
                if (fetcher instanceof MessageStoreFetcherImpl) {
                    long count = ((MessageStoreFetcherImpl) fetcher).getFetcherCache().stats().loadCount();
                    measurement.record(count, newAttributesBuilder().build());
                }
            });

        cacheBytes = meter.gaugeBuilder(GAUGE_CACHE_BYTES)
            .setDescription("Tiered store cache message bytes")
            .setUnit("bytes")
            .ofLongs()
            .buildWithCallback(measurement -> {
                if (fetcher instanceof MessageStoreFetcherImpl) {
                    long count = ((MessageStoreFetcherImpl) fetcher).getFetcherCache().estimatedSize();
                    measurement.record(count, newAttributesBuilder().build());
                }
            });

        cacheAccess = meter.counterBuilder(COUNTER_CACHE_ACCESS)
            .setDescription("Tiered store cache access count")
            .build();

        cacheHit = meter.counterBuilder(COUNTER_CACHE_HIT)
            .setDescription("Tiered store cache hit count")
            .build();

        storageSize = meter.gaugeBuilder(GAUGE_STORAGE_SIZE)
            .setDescription("Broker storage size")
            .setUnit("bytes")
            .ofLongs()
            .buildWithCallback(measurement -> {
                Map<String, Map<FileSegmentType, Long>> topicFileSizeMap = new HashMap<>();
                try {
                    MetadataStore metadataStore = flatFileStore.getMetadataStore();
                    metadataStore.iterateFileSegment(fileSegment -> {
                        Map<FileSegmentType, Long> subMap =
                            topicFileSizeMap.computeIfAbsent(fileSegment.getPath(), k -> new HashMap<>());
                        FileSegmentType fileSegmentType =
                            FileSegmentType.valueOf(fileSegment.getType());
                        Long size = subMap.computeIfAbsent(fileSegmentType, k -> 0L);
                        subMap.put(fileSegmentType, size + fileSegment.getSize());
                    });
                } catch (Exception e) {
                    log.error("Failed to get storage size", e);
                }
                topicFileSizeMap.forEach((topic, subMap) -> {
                    subMap.forEach((fileSegmentType, size) -> {
                        Attributes attributes = newAttributesBuilder()
                            .put(LABEL_TOPIC, topic)
                            .put(LABEL_FILE_TYPE, fileSegmentType.name().toLowerCase())
                            .build();
                        measurement.record(size, attributes);
                    });
                });
            });

        storageMessageReserveTime = meter.gaugeBuilder(GAUGE_STORAGE_MESSAGE_RESERVE_TIME)
            .setDescription("Broker message reserve time")
            .setUnit("milliseconds")
            .ofLongs()
            .buildWithCallback(measurement -> {
                for (FlatMessageFile flatFile : flatFileStore.deepCopyFlatFileToList()) {
                    long timestamp = flatFile.getMinStoreTimestamp();
                    if (timestamp > 0) {
                        MessageQueue mq = flatFile.getMessageQueue();
                        Attributes attributes = newAttributesBuilder()
                            .put(LABEL_TOPIC, mq.getTopic())
                            .put(LABEL_QUEUE_ID, mq.getQueueId())
                            .build();
                        measurement.record(System.currentTimeMillis() - timestamp, attributes);
                    }
                }
            });
    }