public static void createNormalTopic()

in core/src/main/java/org/apache/rocketmq/streams/core/util/RocketMQUtil.java [52:94]


    public static void createNormalTopic(DefaultMQAdminExt mqAdmin, String topicName, int totalQueueNum, Set<String> clusters) throws Exception {
        if (check(mqAdmin, topicName)) {
            logger.info("topic[{}] already exist.", topicName);
            return;
        }

        if (clusters == null || clusters.size() == 0) {
            clusters = getCluster(mqAdmin);
        }


        for (String cluster : clusters) {
            Set<String> masterSet = CommandUtil.fetchMasterAddrByClusterName(mqAdmin, cluster);

            int remainder = totalQueueNum % masterSet.size();
            if (remainder != 0) {
                String temp = String.format("can not create topic:%s, total num=%s, master num=%s", topicName, totalQueueNum, masterSet.size());
                logger.warn(temp);
            }

            int queueNumInEachBroker = totalQueueNum / masterSet.size();
            TopicConfig topicConfig = new TopicConfig(topicName, queueNumInEachBroker, queueNumInEachBroker, PermName.PERM_READ | PermName.PERM_WRITE);

            if (remainder == 0) {
                for (String addr : masterSet) {
                    mqAdmin.createAndUpdateTopicConfig(addr, topicConfig);
                    logger.info("create topic to broker:{} cluster:{}, success.", addr, cluster);
                }
            } else {
                String[] masterArray = masterSet.toArray(new String[]{});

                topicConfig = new TopicConfig(topicName, queueNumInEachBroker + remainder,
                        queueNumInEachBroker + remainder, PermName.PERM_READ | PermName.PERM_WRITE);
                mqAdmin.createAndUpdateTopicConfig(masterArray[0], topicConfig);

                for (int i = 1; i < masterArray.length; i++) {
                    topicConfig = new TopicConfig(topicName, queueNumInEachBroker, queueNumInEachBroker, PermName.PERM_READ | PermName.PERM_WRITE);
                    mqAdmin.createAndUpdateTopicConfig(masterArray[0], topicConfig);
                }
            }

        }
    }