in mqtt-ds/src/main/java/org/apache/rocketmq/mqtt/ds/notify/NotifyManager.java [118:148]
private void refresh() throws MQClientException {
Set<String> tmp = metaPersistManager.getAllFirstTopics();
if (tmp == null || tmp.isEmpty()) {
return;
}
Set<String> thisTopicList = new HashSet<>();
for (String topic : tmp) {
try {
if (topic.equals(serviceConf.getClientRetryTopic())) {
// notify by RetryDriver self
continue;
}
firstTopicManager.checkFirstTopicIfCreated(topic);
thisTopicList.add(topic);
if (!topics.contains(topic)) {
subscribe(topic);
topics.add(topic);
}
} catch (TopicNotExistException e) {
logger.error("", e);
}
}
Iterator<String> iterator = topics.iterator();
while (iterator.hasNext()) {
String topic = iterator.next();
if (!thisTopicList.contains(topic)) {
iterator.remove();
unsubscribe(topic);
}
}
}