in tools/src/main/java/org/apache/rocketmq/tools/command/cluster/ClusterListSubCommand.java [179:303]
private void printClusterBaseInfo(final Set<String> clusterNames,
final DefaultMQAdminExt defaultMQAdminExt,
final ClusterInfo clusterInfo) {
System.out.printf("%-22s %-22s %-4s %-22s %-16s %16s %30s %-22s %-11s %-12s %-8s %-10s%n",
"#Cluster Name",
"#Broker Name",
"#BID",
"#Addr",
"#Version",
"#InTPS(LOAD)",
"#OutTPS(LOAD)",
"#Timer(Progress)",
"#PCWait(ms)",
"#Hour",
"#SPACE",
"#ACTIVATED"
);
for (String clusterName : clusterNames) {
TreeSet<String> brokerNameTreeSet = new TreeSet<>();
Set<String> brokerNameSet = clusterInfo.getClusterAddrTable().get(clusterName);
if (brokerNameSet != null && !brokerNameSet.isEmpty()) {
brokerNameTreeSet.addAll(brokerNameSet);
}
for (String brokerName : brokerNameTreeSet) {
BrokerData brokerData = clusterInfo.getBrokerAddrTable().get(brokerName);
if (brokerData != null) {
Iterator<Map.Entry<Long, String>> itAddr = brokerData.getBrokerAddrs().entrySet().iterator();
while (itAddr.hasNext()) {
Map.Entry<Long, String> next1 = itAddr.next();
double in = 0;
double out = 0;
String version = "";
String sendThreadPoolQueueSize = "";
String pullThreadPoolQueueSize = "";
String ackThreadPoolQueueSize = "";
String sendThreadPoolQueueHeadWaitTimeMills = "";
String pullThreadPoolQueueHeadWaitTimeMills = "";
String ackThreadPoolQueueHeadWaitTimeMills = "";
String pageCacheLockTimeMills = "";
String earliestMessageTimeStamp = "";
String commitLogDiskRatio = "";
long timerReadBehind = 0;
long timerOffsetBehind = 0;
long timerCongestNum = 0;
float timerEnqueueTps = 0.0f;
float timerDequeueTps = 0.0f;
boolean isBrokerActive = false;
try {
KVTable kvTable = defaultMQAdminExt.fetchBrokerRuntimeStats(next1.getValue());
isBrokerActive = Boolean.parseBoolean(kvTable.getTable().get("brokerActive"));
String putTps = kvTable.getTable().get("putTps");
String getTransferredTps = kvTable.getTable().get("getTransferredTps");
sendThreadPoolQueueSize = kvTable.getTable().get("sendThreadPoolQueueSize");
pullThreadPoolQueueSize = kvTable.getTable().get("pullThreadPoolQueueSize");
ackThreadPoolQueueSize = kvTable.getTable().getOrDefault("ackThreadPoolQueueSize", "N");
sendThreadPoolQueueHeadWaitTimeMills = kvTable.getTable().get("sendThreadPoolQueueHeadWaitTimeMills");
pullThreadPoolQueueHeadWaitTimeMills = kvTable.getTable().get("pullThreadPoolQueueHeadWaitTimeMills");
ackThreadPoolQueueHeadWaitTimeMills = kvTable.getTable().getOrDefault("ackThreadPoolQueueHeadWaitTimeMills", "N");
pageCacheLockTimeMills = kvTable.getTable().get("pageCacheLockTimeMills");
earliestMessageTimeStamp = kvTable.getTable().get("earliestMessageTimeStamp");
commitLogDiskRatio = kvTable.getTable().get("commitLogDiskRatio");
try {
timerReadBehind = Long.parseLong(kvTable.getTable().get("timerReadBehind"));
timerOffsetBehind = Long.parseLong(kvTable.getTable().get("timerOffsetBehind"));
timerCongestNum = Long.parseLong(kvTable.getTable().get("timerCongestNum"));
timerEnqueueTps = Float.parseFloat(kvTable.getTable().get("timerEnqueueTps"));
timerDequeueTps = Float.parseFloat(kvTable.getTable().get("timerDequeueTps"));
} catch (Throwable ignored) {
}
version = kvTable.getTable().get("brokerVersionDesc");
if (StringUtils.isNotBlank(putTps)) {
String[] tpss = putTps.split(" ");
if (tpss.length > 0) {
in = Double.parseDouble(tpss[0]);
}
}
if (StringUtils.isNotBlank(getTransferredTps)) {
String[] tpss = getTransferredTps.split(" ");
if (tpss.length > 0) {
out = Double.parseDouble(tpss[0]);
}
}
} catch (Exception e) {
e.printStackTrace();
}
double hour = 0.0;
double space = 0.0;
if (earliestMessageTimeStamp != null && earliestMessageTimeStamp.length() > 0) {
long mills = System.currentTimeMillis() - Long.parseLong(earliestMessageTimeStamp);
hour = mills / 1000.0 / 60.0 / 60.0;
}
if (commitLogDiskRatio != null && commitLogDiskRatio.length() > 0) {
space = Double.parseDouble(commitLogDiskRatio);
}
System.out.printf("%-22s %-22s %-4s %-22s %-16s %16s %30s %-22s %11s %-12s %-8s %10s%n",
clusterName,
brokerName,
next1.getKey(),
next1.getValue(),
version,
String.format("%9.2f(%s,%sms)", in, sendThreadPoolQueueSize, sendThreadPoolQueueHeadWaitTimeMills),
String.format("%9.2f(%s,%sms|%s,%sms)", out, pullThreadPoolQueueSize, pullThreadPoolQueueHeadWaitTimeMills, ackThreadPoolQueueSize, ackThreadPoolQueueHeadWaitTimeMills),
String.format("%d-%d(%.1fw, %.1f, %.1f)", timerReadBehind, timerOffsetBehind, timerCongestNum / 10000.0f, timerEnqueueTps, timerDequeueTps),
pageCacheLockTimeMills,
String.format("%2.2f", hour),
String.format("%.4f", space),
isBrokerActive
);
}
}
}
}
}