public void notify()

in mqtt-cs/src/main/java/org/apache/rocketmq/mqtt/cs/session/notify/MessageNotifyAction.java [55:80]


    public void notify(List<MessageEvent> events) {
        if (events == null || events.isEmpty()) {
            return;
        }
        for (MessageEvent event : events) {
            Set<Pair<Session, Subscription>> result = matchAction.matchClients(
                    TopicUtils.normalizeTopic(event.getPubTopic()), event.getNamespace());
            if (result == null || result.isEmpty()) {
                continue;
            }
            for (Pair<Session, Subscription> pair : result) {
                Session session = pair.getLeft();
                Subscription subscription = pair.getRight();
                Set<Queue> set = queueFresh.freshQueue(session, subscription);
                if (set == null || set.isEmpty()) {
                    continue;
                }
                for (Queue queue : set) {
                    if (isTargetQueue(queue, event)) {
                        queueCache.refreshCache(Pair.of(queue, session));
                        sessionLoop.notifyPullMessage(session, subscription, queue);
                    }
                }
            }
        }
    }