public void buildRoute()

in connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/RmqSourceReplicator.java [207:266]


    public void buildRoute() {
        List<Pattern> patterns = new ArrayList<Pattern>();
        String srcCluster = this.replicatorConfig.getSrcCluster();
        try {
            Set<String> targetTopicSet = fetchTargetTopics();
            for (String topic : this.replicatorConfig.getWhiteList()) {
                Pattern pattern = Pattern.compile(topic);
                patterns.add(pattern);
            }

            TopicList topics = srcMQAdminExt.fetchAllTopicList();
            for (String topic : topics.getTopicList()) {
                if (topic.equals(ConfigDefine.CONN_STORE_TOPIC)) {
                    continue;
                }

                if (!syncRETRY && topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
                    continue;
                }

                if (!syncDLQ && topic.startsWith(MixAll.DLQ_GROUP_TOPIC_PREFIX)) {
                    continue;
                }

                for (Pattern pattern : patterns) {
                    Matcher matcher = pattern.matcher(topic);
                    if (matcher.matches()) {
                        String targetTopic = generateTargetTopic(topic);
                        if (!targetTopicSet.contains(targetTopic)) {
                            ensureTargetTopic(topic, targetTopic);
                        }

                        // different from BrokerData with cluster field, which can ensure the brokerData is from expected cluster.
                        // QueueData use brokerName as unique info on cluster of rocketmq. so when we want to get QueueData of
                        // expected cluster, we should get brokerNames of expected cluster, and then filter queueDatas.
                        List<BrokerData> brokerList = Utils.examineBrokerData(this.srcMQAdminExt, topic, srcCluster);
                        Set<String> brokerNameSet = new HashSet<String>();
                        for (BrokerData b : brokerList) {
                            brokerNameSet.add(b.getBrokerName());
                        }

                        TopicRouteData topicRouteData = srcMQAdminExt.examineTopicRouteInfo(topic);
                        if (!topicRouteMap.containsKey(topic)) {
                            topicRouteMap.put(topic, new HashSet<>(16));
                        }
                        for (QueueData qd : topicRouteData.getQueueDatas()) {
                            if (brokerNameSet.contains(qd.getBrokerName())) {
                                for (int i = 0; i < qd.getReadQueueNums(); i++) {
                                    TaskTopicInfo taskTopicInfo = new TaskTopicInfo(topic, qd.getBrokerName(), i, targetTopic);
                                    topicRouteMap.get(topic).add(taskTopicInfo);
                                }
                            }
                        }
                    }
                }
            }
        } catch (Exception e) {
            log.error("Fetch topic list error.", e);
        }
    }