in tieredstore/src/main/java/org/apache/rocketmq/tieredstore/metrics/TieredStoreMetricsManager.java [146:335]
public static void init(Meter meter, Supplier<AttributesBuilder> attributesBuilderSupplier,
MessageStoreConfig storeConfig, MessageStoreFetcher fetcher,
FlatFileStore flatFileStore, MessageStore next) {
TieredStoreMetricsManager.attributesBuilderSupplier = attributesBuilderSupplier;
apiLatency = meter.histogramBuilder(HISTOGRAM_API_LATENCY)
.setDescription("Tiered store rpc latency")
.setUnit("milliseconds")
.ofLongs()
.build();
providerRpcLatency = meter.histogramBuilder(HISTOGRAM_PROVIDER_RPC_LATENCY)
.setDescription("Tiered store rpc latency")
.setUnit("milliseconds")
.ofLongs()
.build();
uploadBytes = meter.histogramBuilder(HISTOGRAM_UPLOAD_BYTES)
.setDescription("Tiered store upload buffer size")
.setUnit("bytes")
.ofLongs()
.build();
downloadBytes = meter.histogramBuilder(HISTOGRAM_DOWNLOAD_BYTES)
.setDescription("Tiered store download buffer size")
.setUnit("bytes")
.ofLongs()
.build();
dispatchBehind = meter.gaugeBuilder(GAUGE_DISPATCH_BEHIND)
.setDescription("Tiered store dispatch behind message count")
.ofLongs()
.buildWithCallback(measurement -> {
for (FlatMessageFile flatFile : flatFileStore.deepCopyFlatFileToList()) {
try {
MessageQueue mq = flatFile.getMessageQueue();
long maxOffset = next.getMaxOffsetInQueue(mq.getTopic(), mq.getQueueId());
long maxTimestamp = next.getMessageStoreTimeStamp(mq.getTopic(), mq.getQueueId(), maxOffset - 1);
if (maxTimestamp > 0 && System.currentTimeMillis() - maxTimestamp > TimeUnit.HOURS.toMillis(flatFile.getFileReservedHours())) {
continue;
}
Attributes commitLogAttributes = newAttributesBuilder()
.put(LABEL_TOPIC, mq.getTopic())
.put(LABEL_QUEUE_ID, mq.getQueueId())
.put(LABEL_FILE_TYPE, FileSegmentType.COMMIT_LOG.name().toLowerCase())
.build();
Attributes consumeQueueAttributes = newAttributesBuilder()
.put(LABEL_TOPIC, mq.getTopic())
.put(LABEL_QUEUE_ID, mq.getQueueId())
.put(LABEL_FILE_TYPE, FileSegmentType.CONSUME_QUEUE.name().toLowerCase())
.build();
measurement.record(Math.max(maxOffset - flatFile.getConsumeQueueMaxOffset(), 0), consumeQueueAttributes);
} catch (ConsumeQueueException e) {
// TODO: handle exception here
}
}
});
dispatchLatency = meter.gaugeBuilder(GAUGE_DISPATCH_LATENCY)
.setDescription("Tiered store dispatch latency")
.setUnit("seconds")
.ofLongs()
.buildWithCallback(measurement -> {
for (FlatMessageFile flatFile : flatFileStore.deepCopyFlatFileToList()) {
try {
MessageQueue mq = flatFile.getMessageQueue();
long maxOffset = next.getMaxOffsetInQueue(mq.getTopic(), mq.getQueueId());
long maxTimestamp = next.getMessageStoreTimeStamp(mq.getTopic(), mq.getQueueId(), maxOffset - 1);
if (maxTimestamp > 0 && System.currentTimeMillis() - maxTimestamp > TimeUnit.HOURS.toMillis(flatFile.getFileReservedHours())) {
continue;
}
Attributes commitLogAttributes = newAttributesBuilder()
.put(LABEL_TOPIC, mq.getTopic())
.put(LABEL_QUEUE_ID, mq.getQueueId())
.put(LABEL_FILE_TYPE, FileSegmentType.COMMIT_LOG.name().toLowerCase())
.build();
Attributes consumeQueueAttributes = newAttributesBuilder()
.put(LABEL_TOPIC, mq.getTopic())
.put(LABEL_QUEUE_ID, mq.getQueueId())
.put(LABEL_FILE_TYPE, FileSegmentType.CONSUME_QUEUE.name().toLowerCase())
.build();
long consumeQueueDispatchOffset = flatFile.getConsumeQueueMaxOffset();
long consumeQueueDispatchLatency = next.getMessageStoreTimeStamp(mq.getTopic(), mq.getQueueId(), consumeQueueDispatchOffset);
if (maxOffset <= consumeQueueDispatchOffset || consumeQueueDispatchLatency < 0) {
measurement.record(0, consumeQueueAttributes);
} else {
measurement.record(System.currentTimeMillis() - consumeQueueDispatchLatency, consumeQueueAttributes);
}
} catch (ConsumeQueueException e) {
// TODO: handle exception
}
}
});
messagesDispatchTotal = meter.counterBuilder(COUNTER_MESSAGES_DISPATCH_TOTAL)
.setDescription("Total number of dispatch messages")
.build();
messagesOutTotal = meter.counterBuilder(COUNTER_MESSAGES_OUT_TOTAL)
.setDescription("Total number of outgoing messages")
.build();
fallbackTotal = meter.counterBuilder(COUNTER_GET_MESSAGE_FALLBACK_TOTAL)
.setDescription("Total times of fallback to next store when getting message")
.build();
cacheCount = meter.gaugeBuilder(GAUGE_CACHE_COUNT)
.setDescription("Tiered store cache message count")
.ofLongs()
.buildWithCallback(measurement -> {
if (fetcher instanceof MessageStoreFetcherImpl) {
long count = ((MessageStoreFetcherImpl) fetcher).getFetcherCache().stats().loadCount();
measurement.record(count, newAttributesBuilder().build());
}
});
cacheBytes = meter.gaugeBuilder(GAUGE_CACHE_BYTES)
.setDescription("Tiered store cache message bytes")
.setUnit("bytes")
.ofLongs()
.buildWithCallback(measurement -> {
if (fetcher instanceof MessageStoreFetcherImpl) {
long count = ((MessageStoreFetcherImpl) fetcher).getFetcherCache().estimatedSize();
measurement.record(count, newAttributesBuilder().build());
}
});
cacheAccess = meter.counterBuilder(COUNTER_CACHE_ACCESS)
.setDescription("Tiered store cache access count")
.build();
cacheHit = meter.counterBuilder(COUNTER_CACHE_HIT)
.setDescription("Tiered store cache hit count")
.build();
storageSize = meter.gaugeBuilder(GAUGE_STORAGE_SIZE)
.setDescription("Broker storage size")
.setUnit("bytes")
.ofLongs()
.buildWithCallback(measurement -> {
Map<String, Map<FileSegmentType, Long>> topicFileSizeMap = new HashMap<>();
try {
MetadataStore metadataStore = flatFileStore.getMetadataStore();
metadataStore.iterateFileSegment(fileSegment -> {
Map<FileSegmentType, Long> subMap =
topicFileSizeMap.computeIfAbsent(fileSegment.getPath(), k -> new HashMap<>());
FileSegmentType fileSegmentType =
FileSegmentType.valueOf(fileSegment.getType());
Long size = subMap.computeIfAbsent(fileSegmentType, k -> 0L);
subMap.put(fileSegmentType, size + fileSegment.getSize());
});
} catch (Exception e) {
log.error("Failed to get storage size", e);
}
topicFileSizeMap.forEach((topic, subMap) -> {
subMap.forEach((fileSegmentType, size) -> {
Attributes attributes = newAttributesBuilder()
.put(LABEL_TOPIC, topic)
.put(LABEL_FILE_TYPE, fileSegmentType.name().toLowerCase())
.build();
measurement.record(size, attributes);
});
});
});
storageMessageReserveTime = meter.gaugeBuilder(GAUGE_STORAGE_MESSAGE_RESERVE_TIME)
.setDescription("Broker message reserve time")
.setUnit("milliseconds")
.ofLongs()
.buildWithCallback(measurement -> {
for (FlatMessageFile flatFile : flatFileStore.deepCopyFlatFileToList()) {
long timestamp = flatFile.getMinStoreTimestamp();
if (timestamp > 0) {
MessageQueue mq = flatFile.getMessageQueue();
Attributes attributes = newAttributesBuilder()
.put(LABEL_TOPIC, mq.getTopic())
.put(LABEL_QUEUE_ID, mq.getQueueId())
.build();
measurement.record(System.currentTimeMillis() - timestamp, attributes);
}
}
});
}