in src/main/java/org/apache/rocketmq/exporter/task/MetricsCollectTask.java [421:583]
public void collectBrokerStatsTopic() {
if (!rmqConfigure.isEnableCollect()) {
return;
}
log.info("broker topic stats collection task starting....");
long start = System.currentTimeMillis();
Set<String> topicSet = null;
try {
TopicList topicList = mqAdminExt.fetchAllTopicList();
topicSet = topicList.getTopicList();
} catch (Exception ex) {
log.error(String.format("collectBrokerStatsTopic-fetch topic list from namesrv error, the address is %s",
JSON.toJSONString(mqAdminExt.getNameServerAddressList())), ex);
return;
}
if (topicSet == null || topicSet.isEmpty()) {
return;
}
ClusterInfo clusterInfo = null;
try {
clusterInfo = mqAdminExt.examineBrokerClusterInfo();
} catch (Exception ex) {
log.error(String.format("collectBrokerStatsTopic-fetch cluster info exception, the address is %s",
JSON.toJSONString(mqAdminExt.getNameServerAddressList())), ex);
return;
}
for (String topic : topicSet) {
if (topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX) || topic.startsWith(MixAll.DLQ_GROUP_TOPIC_PREFIX)) {
continue;
}
TopicRouteData topicRouteData = null;
try {
topicRouteData = mqAdminExt.examineTopicRouteInfo(topic);
} catch (Exception ex) {
log.error(String.format("fetch topic route error. ignore %s", topic), ex);
continue;
}
for (BrokerData bd : topicRouteData.getBrokerDatas()) {
String masterAddr = bd.getBrokerAddrs().get(MixAll.MASTER_ID);
if (!StringUtils.isBlank(masterAddr)) {
BrokerStatsData bsd = null;
try {
//how many messages has sent for the topic
bsd = mqAdminExt.viewBrokerStatsData(masterAddr, BrokerStatsManager.TOPIC_PUT_NUMS, topic);
String brokerIP = clusterInfo.getBrokerAddrTable().get(bd.getBrokerName()).getBrokerAddrs().get(MixAll.MASTER_ID);
metricsService.getCollector().addTopicPutNumsMetric(
bd.getCluster(),
bd.getBrokerName(),
brokerIP,
topic,
Utils.getFixedDouble(bsd.getStatsMinute().getSum())
);
} catch (MQClientException ex) {
if (ex.getResponseCode() == ResponseCode.SYSTEM_ERROR) {
//log.error(String.format("TOPIC_PUT_NUMS-error, topic=%s, master broker=%s, %s", topic, masterAddr, ex.getErrorMessage()));
} else {
log.error(String.format("TOPIC_PUT_NUMS-error, topic=%s, master broker=%s", topic, masterAddr), ex);
}
} catch (RemotingTimeoutException | InterruptedException | RemotingSendRequestException | RemotingConnectException ex1) {
log.error(String.format("TOPIC_PUT_NUMS-error, topic=%s, master broker=%s", topic, masterAddr), ex1);
}
try {
//how many bytes has sent for the topic
bsd = mqAdminExt.viewBrokerStatsData(masterAddr, BrokerStatsManager.TOPIC_PUT_SIZE, topic);
String brokerIP = clusterInfo.getBrokerAddrTable().get(bd.getBrokerName()).getBrokerAddrs().get(MixAll.MASTER_ID);
metricsService.getCollector().addTopicPutSizeMetric(
bd.getCluster(),
bd.getBrokerName(),
brokerIP,
topic,
Utils.getFixedDouble(bsd.getStatsMinute().getSum())
);
} catch (MQClientException ex) {
if (ex.getResponseCode() == ResponseCode.SYSTEM_ERROR) {
//log.error(String.format("TOPIC_PUT_SIZE-error, topic=%s, master broker=%s, %s", topic, masterAddr, ex.getErrorMessage()));
} else {
log.error(String.format("TOPIC_PUT_SIZE-error, topic=%s, master broker=%s", topic, masterAddr), ex);
}
} catch (InterruptedException | RemotingConnectException | RemotingTimeoutException | RemotingSendRequestException ex) {
log.error(String.format("TOPIC_PUT_SIZE-error, topic=%s, master broker=%s", topic, masterAddr), ex);
}
}
}
GroupList groupList = null;
try {
groupList = mqAdminExt.queryTopicConsumeByWho(topic);
} catch (Exception ex) {
//log.error(String.format("collectBrokerStatsTopic-fetch consumers for topic(%s) error, ignore this topic", topic), ex);
continue;
}
if (groupList.getGroupList() == null || groupList.getGroupList().isEmpty()) {
//log.warn(String.format("collectBrokerStatsTopic-topic's consumer is empty, %s", topic));
continue;
}
for (String group : groupList.getGroupList()) {
for (BrokerData bd : topicRouteData.getBrokerDatas()) {
String masterAddr = bd.getBrokerAddrs().get(MixAll.MASTER_ID);
if (masterAddr != null) {
String statsKey = String.format("%s@%s", topic, group);
BrokerStatsData bsd = null;
try {
//how many messages the consumer has get for the topic
bsd = mqAdminExt.viewBrokerStatsData(masterAddr, BrokerStatsManager.GROUP_GET_NUMS, statsKey);
metricsService.getCollector().addGroupGetNumsMetric(
bd.getCluster(),
bd.getBrokerName(),
topic,
group,
Utils.getFixedDouble(bsd.getStatsMinute().getSum()));
} catch (MQClientException ex) {
if (ex.getResponseCode() == ResponseCode.SYSTEM_ERROR) {
//log.error(String.format("GROUP_GET_NUMS-error, topic=%s, group=%s,master broker=%s, %s", topic, group, masterAddr, ex.getErrorMessage()));
} else {
log.error(String.format("GROUP_GET_NUMS-error, topic=%s, group=%s,master broker=%s", topic, group, masterAddr), ex);
}
} catch (InterruptedException | RemotingConnectException | RemotingTimeoutException | RemotingSendRequestException ex) {
log.error(String.format("GROUP_GET_NUMS-error, topic=%s, group=%s,master broker=%s", topic, group, masterAddr), ex);
}
try {
//how many bytes the consumer has get for the topic
bsd = mqAdminExt.viewBrokerStatsData(masterAddr, BrokerStatsManager.GROUP_GET_SIZE, statsKey);
metricsService.getCollector().addGroupGetSizeMetric(
bd.getCluster(),
bd.getBrokerName(),
topic,
group,
Utils.getFixedDouble(bsd.getStatsMinute().getSum()));
} catch (MQClientException ex) {
if (ex.getResponseCode() == ResponseCode.SYSTEM_ERROR) {
// log.error(String.format("GROUP_GET_SIZE-error, topic=%s, group=%s, master broker=%s, %s", topic, group, masterAddr, ex.getErrorMessage()));
} else {
log.error(String.format("GROUP_GET_SIZE-error, topic=%s, group=%s, master broker=%s", topic, group, masterAddr), ex);
}
} catch (InterruptedException | RemotingConnectException | RemotingTimeoutException | RemotingSendRequestException ex) {
log.error(String.format("GROUP_GET_SIZE-error, topic=%s, group=%s, master broker=%s", topic, group, masterAddr), ex);
}
try {
////how many re-send times the consumer did for the topic
bsd = mqAdminExt.viewBrokerStatsData(masterAddr, BrokerStatsManager.SNDBCK_PUT_NUMS, statsKey);
metricsService.getCollector().addSendBackNumsMetric(
bd.getCluster(),
bd.getBrokerName(),
topic,
group,
bsd.getStatsMinute().getSum());
} catch (MQClientException ex) {
if (ex.getResponseCode() == ResponseCode.SYSTEM_ERROR) {
//log.error(String.format("SNDBCK_PUT_NUMS-error, topic=%s, group=%s, master broker=%s, %s", topic, group, masterAddr, ex.getErrorMessage()));
} else {
log.error(String.format("SNDBCK_PUT_NUMS-error, topic=%s, group=%s, master broker=%s", topic, group, masterAddr), ex);
}
} catch (InterruptedException | RemotingConnectException | RemotingTimeoutException | RemotingSendRequestException ex) {
log.error(String.format("SNDBCK_PUT_NUMS-error, topic=%s, group=%s, master broker=%s", topic, group, masterAddr), ex);
}
}
}
}
}
log.info("broker topic stats collection task finished...." + (System.currentTimeMillis() - start));
}