in mqtt-cs/src/main/java/org/apache/rocketmq/mqtt/cs/session/notify/MessageNotifyAction.java [55:80]
public void notify(List<MessageEvent> events) {
if (events == null || events.isEmpty()) {
return;
}
for (MessageEvent event : events) {
Set<Pair<Session, Subscription>> result = matchAction.matchClients(
TopicUtils.normalizeTopic(event.getPubTopic()), event.getNamespace());
if (result == null || result.isEmpty()) {
continue;
}
for (Pair<Session, Subscription> pair : result) {
Session session = pair.getLeft();
Subscription subscription = pair.getRight();
Set<Queue> set = queueFresh.freshQueue(session, subscription);
if (set == null || set.isEmpty()) {
continue;
}
for (Queue queue : set) {
if (isTargetQueue(queue, event)) {
queueCache.refreshCache(Pair.of(queue, session));
sessionLoop.notifyPullMessage(session, subscription, queue);
}
}
}
}
}