in core/src/main/java/org/apache/rocketmq/streams/core/util/RocketMQUtil.java [52:94]
public static void createNormalTopic(DefaultMQAdminExt mqAdmin, String topicName, int totalQueueNum, Set<String> clusters) throws Exception {
if (check(mqAdmin, topicName)) {
logger.info("topic[{}] already exist.", topicName);
return;
}
if (clusters == null || clusters.size() == 0) {
clusters = getCluster(mqAdmin);
}
for (String cluster : clusters) {
Set<String> masterSet = CommandUtil.fetchMasterAddrByClusterName(mqAdmin, cluster);
int remainder = totalQueueNum % masterSet.size();
if (remainder != 0) {
String temp = String.format("can not create topic:%s, total num=%s, master num=%s", topicName, totalQueueNum, masterSet.size());
logger.warn(temp);
}
int queueNumInEachBroker = totalQueueNum / masterSet.size();
TopicConfig topicConfig = new TopicConfig(topicName, queueNumInEachBroker, queueNumInEachBroker, PermName.PERM_READ | PermName.PERM_WRITE);
if (remainder == 0) {
for (String addr : masterSet) {
mqAdmin.createAndUpdateTopicConfig(addr, topicConfig);
logger.info("create topic to broker:{} cluster:{}, success.", addr, cluster);
}
} else {
String[] masterArray = masterSet.toArray(new String[]{});
topicConfig = new TopicConfig(topicName, queueNumInEachBroker + remainder,
queueNumInEachBroker + remainder, PermName.PERM_READ | PermName.PERM_WRITE);
mqAdmin.createAndUpdateTopicConfig(masterArray[0], topicConfig);
for (int i = 1; i < masterArray.length; i++) {
topicConfig = new TopicConfig(topicName, queueNumInEachBroker, queueNumInEachBroker, PermName.PERM_READ | PermName.PERM_WRITE);
mqAdmin.createAndUpdateTopicConfig(masterArray[0], topicConfig);
}
}
}
}