public static void createNormalTopic()

in core/src/main/java/org/apache/rocketmq/streams/core/util/RocketMQUtil.java [96:120]


    public static void createNormalTopic(DefaultMQAdminExt mqAdmin, String sourceTopic, String stateTopic) throws Exception {
        //找到brokerAddr
        TopicRouteData topicRouteData = mqAdmin.examineTopicRouteInfo(sourceTopic);
        List<QueueData> queueData = topicRouteData.getQueueDatas();
        List<BrokerData> brokerData = topicRouteData.getBrokerDatas();


        HashMap<String, String> brokerName2MaterBrokerAddr = new HashMap<>();
        for (BrokerData broker : brokerData) {
            String masterBrokerAddr = broker.getBrokerAddrs().get(0L);
            brokerName2MaterBrokerAddr.put(broker.getBrokerName(), masterBrokerAddr);
        }

        for (QueueData queue : queueData) {
            int readQueueNums = queue.getReadQueueNums();
            int writeQueueNums = queue.getWriteQueueNums();
            String brokerName = queue.getBrokerName();

            TopicConfig topicConfig = new TopicConfig(stateTopic, readQueueNums, writeQueueNums);

            mqAdmin.createAndUpdateTopicConfig(brokerName2MaterBrokerAddr.get(brokerName), topicConfig);
        }

        existTopic.add(stateTopic);
    }