public void notifyPullMessage()

in mqtt-cs/src/main/java/org/apache/rocketmq/mqtt/cs/session/loop/SessionLoopImpl.java [361:412]


    public void notifyPullMessage(Session session, Subscription subscription, Queue queue) {
        if (session == null || session.isDestroyed()) {
            return;
        }
        if (subscriptionPersistManager == null) {
            subscriptionPersistManager = SpringUtils.getBean(SubscriptionPersistManager.class);
        }
        if (subscriptionPersistManager != null &&
                !session.isClean() && !session.isLoaded()) {
            if (session.isLoading()) {
                return;
            }
            session.setLoading();
            CompletableFuture<Set<Subscription>> future = subscriptionPersistManager.loadSubscriptions(session.getClientId());
            future.whenComplete((subscriptions, throwable) -> {
                if (throwable != null) {
                    logger.error("", throwable);
                    scheduler.schedule(() -> {
                        session.resetLoad();
                        notifyPullMessage(session, subscription, queue);
                    }, 3, TimeUnit.SECONDS);
                    return;
                }
                session.addSubscription(subscriptions);
                matchAction.addSubscription(session, subscriptions);
                session.setLoaded();
                notifyPullMessage(session, subscription, queue);
            });
            return;
        }
        if (queue != null) {
            if (subscription == null) {
                throw new RuntimeException(
                        "invalid notifyPullMessage, subscription is null, but queue is not null," + session.getClientId());
            }
            queueFresh.freshQueue(session, subscription);
            pullMessage(session, subscription, queue);
            return;
        }
        for (Subscription each : session.subscriptionSnapshot()) {
            if (subscription != null && !each.equals(subscription)) {
                continue;
            }
            queueFresh.freshQueue(session, each);
            Map<Queue, QueueOffset> queueOffsets = session.getQueueOffset(each);
            if (queueOffsets != null) {
                for (Map.Entry<Queue, QueueOffset> entry : queueOffsets.entrySet()) {
                    pullMessage(session, each, entry.getKey());
                }
            }
        }
    }