in src/main/java/org/apache/rocketmq/exporter/task/MetricsCollectTask.java [586:645]
public void collectBrokerStats() {
if (!rmqConfigure.isEnableCollect()) {
return;
}
log.info("broker stats collection task starting....");
long start = System.currentTimeMillis();
ClusterInfo clusterInfo = null;
try {
clusterInfo = mqAdminExt.examineBrokerClusterInfo();
} catch (Exception ex) {
log.error(String.format("collectBrokerStats-get cluster info from namesrv error. address is %s", JSON.toJSONString(mqAdminExt.getNameServerAddressList())), ex);
return;
}
Set<Map.Entry<String, BrokerData>> clusterEntries = clusterInfo.getBrokerAddrTable().entrySet();
for (Map.Entry<String, BrokerData> clusterEntry : clusterEntries) {
String masterAddr = clusterEntry.getValue().getBrokerAddrs().get(MixAll.MASTER_ID);
if (StringUtils.isBlank(masterAddr)) {
continue;
}
BrokerStatsData bsd = null;
String clusterName = clusterEntry.getValue().getCluster();
String brokerIP = clusterEntry.getValue().getBrokerAddrs().get(MixAll.MASTER_ID);
String brokerName = clusterEntry.getValue().getBrokerName();
try {
bsd = mqAdminExt.viewBrokerStatsData(masterAddr, BrokerStatsManager.BROKER_PUT_NUMS, clusterName);
metricsService.getCollector().addBrokerPutNumsMetric(
clusterName,
brokerIP,
brokerName,
Utils.getFixedDouble(bsd.getStatsMinute().getSum()));
} catch (MQClientException ex) {
if (ex.getResponseCode() == ResponseCode.SYSTEM_ERROR) {
// log.error(String.format("GROUP_GET_SIZE-error, topic=%s, group=%s, master broker=%s, %s", topic, group, masterAddr, ex.getErrorMessage()));
} else {
log.error(String.format("BROKER_PUT_NUMS-error, master broker=%s", masterAddr), ex);
}
} catch (Exception ex) {
log.error(String.format("BROKER_PUT_NUMS-error, master broker=%s", masterAddr), ex);
}
try {
bsd = mqAdminExt.viewBrokerStatsData(masterAddr, BrokerStatsManager.BROKER_GET_NUMS, clusterName);
metricsService.getCollector().addBrokerGetNumsMetric(
clusterName,
brokerIP,
brokerName,
Utils.getFixedDouble(bsd.getStatsMinute().getTps()));
} catch (MQClientException ex) {
if (ex.getResponseCode() == ResponseCode.SYSTEM_ERROR) {
// log.error(String.format("GROUP_GET_SIZE-error, topic=%s, group=%s, master broker=%s, %s", topic, group, masterAddr, ex.getErrorMessage()));
} else {
log.error(String.format("BROKER_GET_NUMS-error, master broker=%s", masterAddr), ex);
}
} catch (Exception ex) {
log.error(String.format("BROKER_GET_NUMS-error, master broker=%s", masterAddr), ex);
}
}
log.info("broker stats collection task finished...." + (System.currentTimeMillis() - start));
}