in connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/RmqSourceReplicator.java [207:266]
public void buildRoute() {
List<Pattern> patterns = new ArrayList<Pattern>();
String srcCluster = this.replicatorConfig.getSrcCluster();
try {
Set<String> targetTopicSet = fetchTargetTopics();
for (String topic : this.replicatorConfig.getWhiteList()) {
Pattern pattern = Pattern.compile(topic);
patterns.add(pattern);
}
TopicList topics = srcMQAdminExt.fetchAllTopicList();
for (String topic : topics.getTopicList()) {
if (topic.equals(ConfigDefine.CONN_STORE_TOPIC)) {
continue;
}
if (!syncRETRY && topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
continue;
}
if (!syncDLQ && topic.startsWith(MixAll.DLQ_GROUP_TOPIC_PREFIX)) {
continue;
}
for (Pattern pattern : patterns) {
Matcher matcher = pattern.matcher(topic);
if (matcher.matches()) {
String targetTopic = generateTargetTopic(topic);
if (!targetTopicSet.contains(targetTopic)) {
ensureTargetTopic(topic, targetTopic);
}
// different from BrokerData with cluster field, which can ensure the brokerData is from expected cluster.
// QueueData use brokerName as unique info on cluster of rocketmq. so when we want to get QueueData of
// expected cluster, we should get brokerNames of expected cluster, and then filter queueDatas.
List<BrokerData> brokerList = Utils.examineBrokerData(this.srcMQAdminExt, topic, srcCluster);
Set<String> brokerNameSet = new HashSet<String>();
for (BrokerData b : brokerList) {
brokerNameSet.add(b.getBrokerName());
}
TopicRouteData topicRouteData = srcMQAdminExt.examineTopicRouteInfo(topic);
if (!topicRouteMap.containsKey(topic)) {
topicRouteMap.put(topic, new HashSet<>(16));
}
for (QueueData qd : topicRouteData.getQueueDatas()) {
if (brokerNameSet.contains(qd.getBrokerName())) {
for (int i = 0; i < qd.getReadQueueNums(); i++) {
TaskTopicInfo taskTopicInfo = new TaskTopicInfo(topic, qd.getBrokerName(), i, targetTopic);
topicRouteMap.get(topic).add(taskTopicInfo);
}
}
}
}
}
}
} catch (Exception e) {
log.error("Fetch topic list error.", e);
}
}