in src/main/java/org/apache/rocketmq/dashboard/service/impl/TopicServiceImpl.java [124:153]
public TopicTypeList examineAllTopicType() {
List<String> messageTypes = new ArrayList<>();
List<String> names = new ArrayList<>();
ClusterInfo clusterInfo = clusterInfoService.get();
TopicList sysTopics = getSystemTopicList();
clusterInfo.getBrokerAddrTable().values().forEach(brokerAddr -> {
try {
TopicConfigSerializeWrapper topicConfigSerializeWrapper = mqAdminExt.getAllTopicConfig(brokerAddr.getBrokerAddrs().get(0L), 10000L);
for (TopicConfig topicConfig : topicConfigSerializeWrapper.getTopicConfigTable().values()) {
TopicTypeMeta topicType = classifyTopicType(topicConfig.getTopicName(), topicConfigSerializeWrapper.getTopicConfigTable().get(topicConfig.getTopicName()).getAttributes(),sysTopics.getTopicList());
if (names.contains(topicType.getTopicName())) {
continue;
}
names.add(topicType.getTopicName());
messageTypes.add(topicType.getMessageType());
}
} catch (Exception e) {
logger.warn("Failed to classify topic type for broker: " + brokerAddr, e);
}
});
sysTopics.getTopicList().forEach(topicName -> {
String sysTopicName = String.format("%s%s", "%SYS%", topicName);
if (!names.contains(sysTopicName)) {
names.add(sysTopicName);
messageTypes.add("SYSTEM");
}
});
return new TopicTypeList(names, messageTypes);
}