in mqtt-cs/src/main/java/org/apache/rocketmq/mqtt/cs/session/match/MatchAction.java [106:127]
public void addSubscription(Session session, Set<Subscription> subscriptions) {
String channelId = session.getChannelId();
if (channelId == null || subscriptions == null || subscriptions.isEmpty()) {
return;
}
for (Subscription subscription : subscriptions) {
if (subscription.isRetry() || subscription.isP2p()) {
continue;
}
String topicFilter = subscription.getTopicFilter();
boolean isWildCard = TopicUtils.isWildCard(topicFilter);
if (isWildCard) {
trie.addNode(topicFilter, subscription.getQos(), channelId);
continue;
}
synchronized (topicCache) {
topicCache.putIfAbsent(topicFilter, new HashSet<>());
topicCache.get(topicFilter).add(channelId);
}
}
}