private void refreshMeta()

in mqtt-ds/src/main/java/org/apache/rocketmq/mqtt/ds/meta/MetaPersistManagerSample.java [73:109]


    private void refreshMeta() throws RemotingException, InterruptedException, MQClientException {
        String value = defaultMQAdminExt.getKVConfig(RMQ_NAMESPACE, KEY_LMQ_ALL_FIRST_TOPICS);
        if (value == null) {
            return;
        }
        String[] topics = value.split(VALUE_SPLITTER);
        Set<String> tmpFirstTopics = new HashSet<>();
        Map<String, Set<String>> tmpWildcardCache = new ConcurrentHashMap<>();
        for (String topic : topics) {
            tmpFirstTopics.add(topic);
            try {
                String wildcardValue = defaultMQAdminExt.getKVConfig(RMQ_NAMESPACE, topic);
                String[] wildcards = wildcardValue.split(VALUE_SPLITTER);
                Set<String> tmpWildcards = new HashSet<>();
                for (String wildcard : wildcards) {
                    tmpWildcards.add(TopicUtils.normalizeTopic(wildcard));
                }
                tmpWildcardCache.put(topic, tmpWildcards);
            } catch (MQClientException e) {
                if (ResponseCode.QUERY_NOT_FOUND == e.getResponseCode()) {
                    continue;
                }
                throw e;
            }
        }
        firstTopics = tmpFirstTopics;
        wildcardCache = tmpWildcardCache;
        value = defaultMQAdminExt.getKVConfig(RMQ_NAMESPACE, KEY_LMQ_CONNECT_NODES);
        if (StringUtils.isNotBlank(value)) {
            String[] ss = StringUtils.split(value, VALUE_SPLITTER);
            Set<String> set = new HashSet<>();
            for (String s : ss) {
                set.add(s);
            }
            connectNodeSet = set;
        }
    }