in mqtt-ds/src/main/java/org/apache/rocketmq/mqtt/ds/meta/MetaPersistManagerSample.java [73:109]
private void refreshMeta() throws RemotingException, InterruptedException, MQClientException {
String value = defaultMQAdminExt.getKVConfig(RMQ_NAMESPACE, KEY_LMQ_ALL_FIRST_TOPICS);
if (value == null) {
return;
}
String[] topics = value.split(VALUE_SPLITTER);
Set<String> tmpFirstTopics = new HashSet<>();
Map<String, Set<String>> tmpWildcardCache = new ConcurrentHashMap<>();
for (String topic : topics) {
tmpFirstTopics.add(topic);
try {
String wildcardValue = defaultMQAdminExt.getKVConfig(RMQ_NAMESPACE, topic);
String[] wildcards = wildcardValue.split(VALUE_SPLITTER);
Set<String> tmpWildcards = new HashSet<>();
for (String wildcard : wildcards) {
tmpWildcards.add(TopicUtils.normalizeTopic(wildcard));
}
tmpWildcardCache.put(topic, tmpWildcards);
} catch (MQClientException e) {
if (ResponseCode.QUERY_NOT_FOUND == e.getResponseCode()) {
continue;
}
throw e;
}
}
firstTopics = tmpFirstTopics;
wildcardCache = tmpWildcardCache;
value = defaultMQAdminExt.getKVConfig(RMQ_NAMESPACE, KEY_LMQ_CONNECT_NODES);
if (StringUtils.isNotBlank(value)) {
String[] ss = StringUtils.split(value, VALUE_SPLITTER);
Set<String> set = new HashSet<>();
for (String s : ss) {
set.add(s);
}
connectNodeSet = set;
}
}