in src/main/java/org/apache/rocketmq/dashboard/task/DashboardCollectTask.java [104:146]
public void collectBroker() {
if (!rmqConfigure.isEnableDashBoardCollect()) {
return;
}
try {
Date date = new Date();
ClusterInfo clusterInfo = mqAdminExt.examineBrokerClusterInfo();
Set<Map.Entry<String, BrokerData>> clusterEntries = clusterInfo.getBrokerAddrTable().entrySet();
Map<String, String> addresses = Maps.newHashMap();
for (Map.Entry<String, BrokerData> clusterEntry : clusterEntries) {
HashMap<Long, String> addrs = clusterEntry.getValue().getBrokerAddrs();
Set<Map.Entry<Long, String>> addrsEntries = addrs.entrySet();
for (Map.Entry<Long, String> addrEntry : addrsEntries) {
addresses.put(addrEntry.getValue(), clusterEntry.getKey() + ":" + addrEntry.getKey());
}
}
Set<Map.Entry<String, String>> entries = addresses.entrySet();
for (Map.Entry<String, String> entry : entries) {
List<String> list = dashboardCollectService.getBrokerMap().get(entry.getValue());
if (null == list) {
list = Lists.newArrayList();
}
KVTable kvTable = fetchBrokerRuntimeStats(entry.getKey(), 3);
if (kvTable == null) {
continue;
}
String[] tpsArray = kvTable.getTable().get("getTotalTps").split(" ");
BigDecimal totalTps = new BigDecimal(0);
for (String tps : tpsArray) {
totalTps = totalTps.add(new BigDecimal(tps));
}
BigDecimal averageTps = totalTps.divide(new BigDecimal(tpsArray.length), 5, BigDecimal.ROUND_HALF_UP);
list.add(date.getTime() + "," + averageTps.toString());
dashboardCollectService.getBrokerMap().put(entry.getValue(), list);
}
log.debug("Broker Collected Data in memory = {}" + JsonUtil.obj2String(dashboardCollectService.getBrokerMap().asMap()));
}
catch (Exception e) {
Throwables.throwIfUnchecked(e);
throw new RuntimeException(e);
}
}