in store/src/main/java/org/apache/rocketmq/store/stats/BrokerStatsManager.java [170:291]
public void init() {
momentStatsItemSetFallSize = new MomentStatsItemSet(GROUP_GET_FALL_SIZE,
scheduledExecutorService, log);
momentStatsItemSetFallTime = new MomentStatsItemSet(GROUP_GET_FALL_TIME,
scheduledExecutorService, log);
if (enableQueueStat) {
this.statsTable.put(Stats.QUEUE_PUT_NUMS, new StatsItemSet(Stats.QUEUE_PUT_NUMS, this.scheduledExecutorService, log));
this.statsTable.put(Stats.QUEUE_PUT_SIZE, new StatsItemSet(Stats.QUEUE_PUT_SIZE, this.scheduledExecutorService, log));
this.statsTable.put(Stats.QUEUE_GET_NUMS, new StatsItemSet(Stats.QUEUE_GET_NUMS, this.scheduledExecutorService, log));
this.statsTable.put(Stats.QUEUE_GET_SIZE, new StatsItemSet(Stats.QUEUE_GET_SIZE, this.scheduledExecutorService, log));
}
this.statsTable.put(Stats.TOPIC_PUT_NUMS, new StatsItemSet(Stats.TOPIC_PUT_NUMS, this.scheduledExecutorService, log));
this.statsTable.put(Stats.TOPIC_PUT_SIZE, new StatsItemSet(Stats.TOPIC_PUT_SIZE, this.scheduledExecutorService, log));
this.statsTable.put(Stats.GROUP_GET_NUMS, new StatsItemSet(Stats.GROUP_GET_NUMS, this.scheduledExecutorService, log));
this.statsTable.put(Stats.GROUP_GET_SIZE, new StatsItemSet(Stats.GROUP_GET_SIZE, this.scheduledExecutorService, log));
this.statsTable.put(Stats.GROUP_ACK_NUMS, new StatsItemSet(Stats.GROUP_ACK_NUMS, this.scheduledExecutorService, log));
this.statsTable.put(Stats.GROUP_CK_NUMS, new StatsItemSet(Stats.GROUP_CK_NUMS, this.scheduledExecutorService, log));
this.statsTable.put(Stats.GROUP_GET_LATENCY, new StatsItemSet(Stats.GROUP_GET_LATENCY, this.scheduledExecutorService, log));
this.statsTable.put(Stats.TOPIC_PUT_LATENCY, new StatsItemSet(Stats.TOPIC_PUT_LATENCY, this.scheduledExecutorService, log));
this.statsTable.put(Stats.SNDBCK_PUT_NUMS, new StatsItemSet(Stats.SNDBCK_PUT_NUMS, this.scheduledExecutorService, log));
this.statsTable.put(DLQ_PUT_NUMS, new StatsItemSet(DLQ_PUT_NUMS, this.scheduledExecutorService, log));
this.statsTable.put(Stats.BROKER_PUT_NUMS, new StatsItemSet(Stats.BROKER_PUT_NUMS, this.scheduledExecutorService, log));
this.statsTable.put(Stats.BROKER_GET_NUMS, new StatsItemSet(Stats.BROKER_GET_NUMS, this.scheduledExecutorService, log));
this.statsTable.put(BROKER_ACK_NUMS, new StatsItemSet(BROKER_ACK_NUMS, this.scheduledExecutorService, log));
this.statsTable.put(BROKER_CK_NUMS, new StatsItemSet(BROKER_CK_NUMS, this.scheduledExecutorService, log));
this.statsTable.put(BROKER_GET_NUMS_WITHOUT_SYSTEM_TOPIC,
new StatsItemSet(BROKER_GET_NUMS_WITHOUT_SYSTEM_TOPIC, this.scheduledExecutorService, log));
this.statsTable.put(BROKER_PUT_NUMS_WITHOUT_SYSTEM_TOPIC,
new StatsItemSet(BROKER_PUT_NUMS_WITHOUT_SYSTEM_TOPIC, this.scheduledExecutorService, log));
this.statsTable.put(Stats.GROUP_GET_FROM_DISK_NUMS,
new StatsItemSet(Stats.GROUP_GET_FROM_DISK_NUMS, this.scheduledExecutorService, log));
this.statsTable.put(Stats.GROUP_GET_FROM_DISK_SIZE,
new StatsItemSet(Stats.GROUP_GET_FROM_DISK_SIZE, this.scheduledExecutorService, log));
this.statsTable.put(Stats.BROKER_GET_FROM_DISK_NUMS,
new StatsItemSet(Stats.BROKER_GET_FROM_DISK_NUMS, this.scheduledExecutorService, log));
this.statsTable.put(Stats.BROKER_GET_FROM_DISK_SIZE,
new StatsItemSet(Stats.BROKER_GET_FROM_DISK_SIZE, this.scheduledExecutorService, log));
this.statsTable.put(SNDBCK2DLQ_TIMES,
new StatsItemSet(SNDBCK2DLQ_TIMES, this.scheduledExecutorService, DLQ_STAT_LOG));
this.statsTable.put(Stats.COMMERCIAL_SEND_TIMES,
new StatsItemSet(Stats.COMMERCIAL_SEND_TIMES, this.commercialExecutor, COMMERCIAL_LOG));
this.statsTable.put(Stats.COMMERCIAL_RCV_TIMES,
new StatsItemSet(Stats.COMMERCIAL_RCV_TIMES, this.commercialExecutor, COMMERCIAL_LOG));
this.statsTable.put(Stats.COMMERCIAL_SEND_SIZE,
new StatsItemSet(Stats.COMMERCIAL_SEND_SIZE, this.commercialExecutor, COMMERCIAL_LOG));
this.statsTable.put(Stats.COMMERCIAL_RCV_SIZE,
new StatsItemSet(Stats.COMMERCIAL_RCV_SIZE, this.commercialExecutor, COMMERCIAL_LOG));
this.statsTable.put(Stats.COMMERCIAL_RCV_EPOLLS,
new StatsItemSet(Stats.COMMERCIAL_RCV_EPOLLS, this.commercialExecutor, COMMERCIAL_LOG));
this.statsTable.put(Stats.COMMERCIAL_SNDBCK_TIMES,
new StatsItemSet(Stats.COMMERCIAL_SNDBCK_TIMES, this.commercialExecutor, COMMERCIAL_LOG));
this.statsTable.put(Stats.COMMERCIAL_PERM_FAILURES,
new StatsItemSet(Stats.COMMERCIAL_PERM_FAILURES, this.commercialExecutor, COMMERCIAL_LOG));
this.statsTable.put(CONSUMER_REGISTER_TIME,
new StatsItemSet(CONSUMER_REGISTER_TIME, this.scheduledExecutorService, log));
this.statsTable.put(PRODUCER_REGISTER_TIME,
new StatsItemSet(PRODUCER_REGISTER_TIME, this.scheduledExecutorService, log));
this.statsTable.put(CHANNEL_ACTIVITY, new StatsItemSet(CHANNEL_ACTIVITY, this.scheduledExecutorService, log));
StatisticsItemFormatter formatter = new StatisticsItemFormatter();
accountStatManager.setBriefMeta(new Pair[] {
Pair.of(RT, new long[][] {{50, 50}, {100, 10}, {1000, 10}}),
Pair.of(INNER_RT, new long[][] {{10, 10}, {100, 10}, {1000, 10}})});
String[] itemNames = new String[] {
MSG_NUM, SUCCESS_MSG_NUM, FAILURE_MSG_NUM, COMMERCIAL_MSG_NUM,
SUCCESS_REQ_NUM, FAILURE_REQ_NUM,
MSG_SIZE, SUCCESS_MSG_SIZE, FAILURE_MSG_SIZE,
RT, INNER_RT};
this.accountStatManager.addStatisticsKindMeta(createStatisticsKindMeta(
ACCOUNT_SEND, itemNames, this.accountExecutor, formatter, ACCOUNT_LOG, ACCOUNT_STAT_INVERTAL));
this.accountStatManager.addStatisticsKindMeta(createStatisticsKindMeta(
ACCOUNT_RCV, itemNames, this.accountExecutor, formatter, ACCOUNT_LOG, ACCOUNT_STAT_INVERTAL));
this.accountStatManager.addStatisticsKindMeta(createStatisticsKindMeta(
ACCOUNT_SEND_BACK, itemNames, this.accountExecutor, formatter, ACCOUNT_LOG, ACCOUNT_STAT_INVERTAL));
this.accountStatManager.addStatisticsKindMeta(createStatisticsKindMeta(
ACCOUNT_SEND_BACK_TO_DLQ, itemNames, this.accountExecutor, formatter, ACCOUNT_LOG, ACCOUNT_STAT_INVERTAL));
this.accountStatManager.addStatisticsKindMeta(createStatisticsKindMeta(
ACCOUNT_SEND_REJ, itemNames, this.accountExecutor, formatter, ACCOUNT_LOG, ACCOUNT_STAT_INVERTAL));
this.accountStatManager.addStatisticsKindMeta(createStatisticsKindMeta(
ACCOUNT_REV_REJ, itemNames, this.accountExecutor, formatter, ACCOUNT_LOG, ACCOUNT_STAT_INVERTAL));
this.accountStatManager.setStatisticsItemStateGetter(new StatisticsItemStateGetter() {
@Override
public boolean online(StatisticsItem item) {
String[] strArr = null;
try {
strArr = splitAccountStatKey(item.getStatObject());
} catch (Exception e) {
log.warn("parse account stat key failed, key: {}", item.getStatObject());
return false;
}
// TODO ugly
if (strArr == null || strArr.length < 4) {
return false;
}
String instanceId = strArr[1];
String topic = strArr[2];
String group = strArr[3];
String kind = item.getStatKind();
if (ACCOUNT_SEND.equals(kind) || ACCOUNT_SEND_REJ.equals(kind)) {
return producerStateGetter.online(instanceId, group, topic);
} else if (ACCOUNT_RCV.equals(kind) || ACCOUNT_SEND_BACK.equals(kind) || ACCOUNT_SEND_BACK_TO_DLQ.equals(kind) || ACCOUNT_REV_REJ.equals(kind)) {
return consumerStateGetter.online(instanceId, group, topic);
}
return false;
}
});
cleanResourceExecutor.scheduleWithFixedDelay(new Runnable() {
@Override
public void run() {
cleanAllResource();
}
}, 10, 10, TimeUnit.MINUTES);
}