in core/src/main/java/org/apache/rocketmq/streams/core/util/RocketMQUtil.java [96:120]
public static void createNormalTopic(DefaultMQAdminExt mqAdmin, String sourceTopic, String stateTopic) throws Exception {
//找到brokerAddr
TopicRouteData topicRouteData = mqAdmin.examineTopicRouteInfo(sourceTopic);
List<QueueData> queueData = topicRouteData.getQueueDatas();
List<BrokerData> brokerData = topicRouteData.getBrokerDatas();
HashMap<String, String> brokerName2MaterBrokerAddr = new HashMap<>();
for (BrokerData broker : brokerData) {
String masterBrokerAddr = broker.getBrokerAddrs().get(0L);
brokerName2MaterBrokerAddr.put(broker.getBrokerName(), masterBrokerAddr);
}
for (QueueData queue : queueData) {
int readQueueNums = queue.getReadQueueNums();
int writeQueueNums = queue.getWriteQueueNums();
String brokerName = queue.getBrokerName();
TopicConfig topicConfig = new TopicConfig(stateTopic, readQueueNums, writeQueueNums);
mqAdmin.createAndUpdateTopicConfig(brokerName2MaterBrokerAddr.get(brokerName), topicConfig);
}
existTopic.add(stateTopic);
}