in store/src/main/java/org/apache/rocketmq/store/metrics/DefaultStoreMetricsManager.java [106:227]
public static void init(Meter meter, Supplier<AttributesBuilder> attributesBuilderSupplier,
DefaultMessageStore messageStore) {
DefaultStoreMetricsManager.attributesBuilderSupplier = attributesBuilderSupplier;
DefaultStoreMetricsManager.messageStoreConfig = messageStore.getMessageStoreConfig();
storageSize = meter.gaugeBuilder(GAUGE_STORAGE_SIZE)
.setDescription("Broker storage size")
.setUnit("bytes")
.ofLongs()
.buildWithCallback(measurement -> {
File storeDir = new File(messageStoreConfig.getStorePathRootDir());
if (storeDir.exists() && storeDir.isDirectory()) {
long totalSpace = storeDir.getTotalSpace();
if (totalSpace > 0) {
measurement.record(totalSpace - storeDir.getFreeSpace(), newAttributesBuilder().build());
}
}
});
flushBehind = meter.gaugeBuilder(GAUGE_STORAGE_FLUSH_BEHIND)
.setDescription("Broker flush behind bytes")
.setUnit("bytes")
.ofLongs()
.buildWithCallback(measurement -> measurement.record(messageStore.flushBehindBytes(), newAttributesBuilder().build()));
dispatchBehind = meter.gaugeBuilder(GAUGE_STORAGE_DISPATCH_BEHIND)
.setDescription("Broker dispatch behind bytes")
.setUnit("bytes")
.ofLongs()
.buildWithCallback(measurement -> measurement.record(messageStore.dispatchBehindBytes(), newAttributesBuilder().build()));
messageReserveTime = meter.gaugeBuilder(GAUGE_STORAGE_MESSAGE_RESERVE_TIME)
.setDescription("Broker message reserve time")
.setUnit("milliseconds")
.ofLongs()
.buildWithCallback(measurement -> {
long earliestMessageTime = messageStore.getEarliestMessageTime();
if (earliestMessageTime <= 0) {
return;
}
measurement.record(System.currentTimeMillis() - earliestMessageTime, newAttributesBuilder().build());
});
if (messageStore.getMessageStoreConfig().isTimerWheelEnable()) {
timerEnqueueLag = meter.gaugeBuilder(GAUGE_TIMER_ENQUEUE_LAG)
.setDescription("Timer enqueue messages lag")
.ofLongs()
.buildWithCallback(measurement -> {
TimerMessageStore timerMessageStore = messageStore.getTimerMessageStore();
measurement.record(timerMessageStore.getEnqueueBehindMessages(), newAttributesBuilder().build());
});
timerEnqueueLatency = meter.gaugeBuilder(GAUGE_TIMER_ENQUEUE_LATENCY)
.setDescription("Timer enqueue latency")
.setUnit("milliseconds")
.ofLongs()
.buildWithCallback(measurement -> {
TimerMessageStore timerMessageStore = messageStore.getTimerMessageStore();
measurement.record(timerMessageStore.getEnqueueBehindMillis(), newAttributesBuilder().build());
});
timerDequeueLag = meter.gaugeBuilder(GAUGE_TIMER_DEQUEUE_LAG)
.setDescription("Timer dequeue messages lag")
.ofLongs()
.buildWithCallback(measurement -> {
TimerMessageStore timerMessageStore = messageStore.getTimerMessageStore();
measurement.record(timerMessageStore.getDequeueBehindMessages(), newAttributesBuilder().build());
});
timerDequeueLatency = meter.gaugeBuilder(GAUGE_TIMER_DEQUEUE_LATENCY)
.setDescription("Timer dequeue latency")
.setUnit("milliseconds")
.ofLongs()
.buildWithCallback(measurement -> {
TimerMessageStore timerMessageStore = messageStore.getTimerMessageStore();
measurement.record(timerMessageStore.getDequeueBehind(), newAttributesBuilder().build());
});
timingMessages = meter.gaugeBuilder(GAUGE_TIMING_MESSAGES)
.setDescription("Current message number in timing")
.ofLongs()
.buildWithCallback(measurement -> {
TimerMessageStore timerMessageStore = messageStore.getTimerMessageStore();
timerMessageStore.getTimerMetrics()
.getTimingCount()
.forEach((topic, metric) -> {
measurement.record(
metric.getCount().get(),
newAttributesBuilder().put(LABEL_TOPIC, topic).build()
);
});
});
timerDequeueTotal = meter.counterBuilder(COUNTER_TIMER_DEQUEUE_TOTAL)
.setDescription("Total number of timer dequeue")
.build();
timerEnqueueTotal = meter.counterBuilder(COUNTER_TIMER_ENQUEUE_TOTAL)
.setDescription("Total number of timer enqueue")
.build();
timerMessageSnapshot = meter.gaugeBuilder(GAUGE_TIMER_MESSAGE_SNAPSHOT)
.setDescription("Timer message distribution snapshot, only count timing messages in 24h.")
.ofLongs()
.buildWithCallback(measurement -> {
TimerMetrics timerMetrics = messageStore.getTimerMessageStore().getTimerMetrics();
TimerWheel timerWheel = messageStore.getTimerMessageStore().getTimerWheel();
int precisionMs = messageStoreConfig.getTimerPrecisionMs();
List<Integer> timerDist = timerMetrics.getTimerDistList();
long currTime = System.currentTimeMillis() / precisionMs * precisionMs;
for (int i = 0; i < timerDist.size(); i++) {
int slotBeforeNum = i == 0 ? 0 : timerDist.get(i - 1) * 1000 / precisionMs;
int slotTotalNum = timerDist.get(i) * 1000 / precisionMs;
int periodTotal = 0;
for (int j = slotBeforeNum; j < slotTotalNum; j++) {
Slot slotEach = timerWheel.getSlot(currTime + (long) j * precisionMs);
periodTotal += slotEach.num;
}
measurement.record(periodTotal, newAttributesBuilder().put(LABEL_TIMING_BOUND, timerDist.get(i).toString()).build());
}
});
timerMessageSetLatency = meter.histogramBuilder(HISTOGRAM_DELAY_MSG_LATENCY)
.setDescription("Timer message set latency distribution")
.setUnit("seconds")
.ofLongs()
.build();
}
}