in core/src/main/java/org/apache/rocketmq/streams/core/util/RocketMQUtil.java [123:144]
public static void createStaticCompactTopic(DefaultMQAdminExt mqAdmin, String topicName, int totalQueueNum, Set<String> clusters) throws Exception {
if (check(mqAdmin, topicName)) {
logger.info("topic[{}] already exist.", topicName);
return;
}
if (clusters == null || clusters.size() == 0) {
clusters = getCluster(mqAdmin);
}
for (String cluster : clusters) {
createStaticTopicWithCommand(topicName, totalQueueNum, new HashSet<>(), cluster, mqAdmin.getNamesrvAddr());
logger.info("【step 1】create static topic:[{}] in cluster:[{}] success, logic queue num:[{}].", topicName, cluster, totalQueueNum);
update2CompactTopicWithCommand(topicName, totalQueueNum, cluster, mqAdmin.getNamesrvAddr());
logger.info("【step 2】update static topic to compact topic success. topic:[{}], cluster:[{}]", topicName, cluster);
}
existTopic.add(topicName);
logger.info("create static-compact topic [{}] success, queue num [{}]", topicName, totalQueueNum);
}