in core/src/main/java/org/apache/rocketmq/streams/core/running/WorkerThread.java [258:272]
void createShuffleTopic() throws Throwable {
Set<String> total = WorkerThread.this.topologyBuilder.getSourceTopic();
List<String> shuffleTopic = new ArrayList<>();
for (String topic : total) {
if (topic.endsWith(Constant.SHUFFLE_TOPIC_SUFFIX)) {
shuffleTopic.add(topic);
}
}
for (String topicName : shuffleTopic) {
RocketMQUtil.createStaticTopic(mqAdmin, topicName, StreamConfig.SHUFFLE_TOPIC_QUEUE_NUM);
}
}