in hertzbeat-collector/hertzbeat-collector-rocketmq/src/main/java/org/apache/hertzbeat/collector/collect/rocketmq/RocketmqSingleCollectImpl.java [187:248]
private void collectClusterData(DefaultMQAdminExt mqAdminExt, RocketmqCollectData rocketmqCollectData) throws Exception {
try {
List<RocketmqCollectData.ClusterBrokerData> clusterBrokerDataList = new ArrayList<>();
rocketmqCollectData.setClusterBrokerDataList(clusterBrokerDataList);
ClusterInfo clusterInfo = mqAdminExt.examineBrokerClusterInfo();
for (BrokerData brokerData : clusterInfo.getBrokerAddrTable().values()) {
for (Map.Entry<Long, String> entry : brokerData.getBrokerAddrs().entrySet()) {
RocketmqCollectData.ClusterBrokerData clusterBrokerData = new RocketmqCollectData.ClusterBrokerData();
clusterBrokerDataList.add(clusterBrokerData);
clusterBrokerData.setBrokerId(entry.getKey());
clusterBrokerData.setAddress(entry.getValue());
KVTable kvTable = mqAdminExt.fetchBrokerRuntimeStats(entry.getValue());
clusterBrokerData.setVersion(kvTable.getTable().get("brokerVersionDesc"));
String putTps = kvTable.getTable().get("putTps");
if (StringUtils.isNotEmpty(putTps)) {
String[] putTpsArr = putTps.split(" ");
clusterBrokerData.setProducerMessageTps(Double.parseDouble(putTpsArr[0]));
}
String getTransferredTps = kvTable.getTable().get("getTransferedTps");
if (StringUtils.isNotEmpty(getTransferredTps)) {
String[] getTransferredTpsArr = getTransferredTps.split(" ");
clusterBrokerData.setConsumerMessageTps(Double.parseDouble(getTransferredTpsArr[0]));
}
String msgPutTotalTodayMorning = kvTable.getTable().get("msgPutTotalTodayMorning");
String msgPutTotalYesterdayMorning = kvTable.getTable().get("msgPutTotalYesterdayMorning");
if (StringUtils.isNotEmpty(msgPutTotalTodayMorning) && StringUtils.isNotEmpty(msgPutTotalYesterdayMorning)) {
long yesterdayProduceCount = Long.parseLong(msgPutTotalTodayMorning) - Long.parseLong(msgPutTotalYesterdayMorning);
clusterBrokerData.setYesterdayProduceCount(yesterdayProduceCount);
}
String msgGetTotalTodayMorning = kvTable.getTable().get("msgGetTotalTodayMorning");
String msgGetTotalYesterdayMorning = kvTable.getTable().get("msgGetTotalYesterdayMorning");
if (StringUtils.isNotEmpty(msgGetTotalTodayMorning) && StringUtils.isNotEmpty(msgGetTotalYesterdayMorning)) {
long yesterdayConsumerCount = Long.parseLong(msgGetTotalTodayMorning) - Long.parseLong(msgGetTotalYesterdayMorning);
clusterBrokerData.setYesterdayConsumeCount(yesterdayConsumerCount);
}
String msgPutTotalTodayNow = kvTable.getTable().get("msgPutTotalTodayNow");
if (StringUtils.isNotEmpty(msgPutTotalTodayNow) && StringUtils.isNotEmpty(msgPutTotalTodayMorning)) {
long todayProduceCount = Long.parseLong(msgPutTotalTodayNow) - Long.parseLong(msgPutTotalTodayMorning);
clusterBrokerData.setTodayProduceCount(todayProduceCount);
}
String msgGetTotalTodayNow = kvTable.getTable().get("msgGetTotalTodayNow");
if (StringUtils.isNotEmpty(msgGetTotalTodayNow) && StringUtils.isNotEmpty(msgGetTotalTodayMorning)) {
long todayConsumerCount = Long.parseLong(msgGetTotalTodayNow) - Long.parseLong(msgGetTotalTodayMorning);
clusterBrokerData.setTodayConsumeCount(todayConsumerCount);
}
}
}
} catch (Exception e) {
log.warn("collect rocketmq cluster data error", e);
throw e;
}
}