void createShuffleTopic()

in core/src/main/java/org/apache/rocketmq/streams/core/running/WorkerThread.java [258:272]


        void createShuffleTopic() throws Throwable {
            Set<String> total = WorkerThread.this.topologyBuilder.getSourceTopic();

            List<String> shuffleTopic = new ArrayList<>();

            for (String topic : total) {
                if (topic.endsWith(Constant.SHUFFLE_TOPIC_SUFFIX)) {
                    shuffleTopic.add(topic);
                }
            }

            for (String topicName : shuffleTopic) {
                RocketMQUtil.createStaticTopic(mqAdmin, topicName, StreamConfig.SHUFFLE_TOPIC_QUEUE_NUM);
            }
        }