private void doPull()

in mqtt-cs/src/main/java/org/apache/rocketmq/mqtt/cs/session/loop/SessionLoopImpl.java [499:559]


    private void doPull(PullEvent pullEvent) {
        Session session = pullEvent.session;
        Subscription subscription = pullEvent.subscription;
        Queue queue = pullEvent.queue;
        QueueOffset queueOffset = session.getQueueOffset(subscription, queue);
        if (session.isDestroyed() || queueOffset == null) {
            clearPullStatus(session, queue, pullEvent);
            return;
        }

        if (!queueOffset.isInitialized()) {
            initOffset(session, subscription, queue, queueOffset, null, null);
            scheduler.schedule(() -> pullMessage(session, subscription, queue), pullIntervalMillis, TimeUnit.MILLISECONDS);
            return;
        }

        pullStatus.put(eventQueueKey(session, queue), true);
        int count = session.getPullSize() > 0 ? session.getPullSize() : connectConf.getPullBatchSize();
        CompletableFuture<PullResult> result = new CompletableFuture<>();
        result.whenComplete((pullResult, throwable) -> {
            if (throwable != null) {
                clearPullStatus(session, queue, pullEvent);
                logger.error("{}", session.getClientId(), throwable);
                if (session.isDestroyed()) {
                    return;
                }
                scheduler.schedule(() -> pullMessage(session, subscription, queue), 1, TimeUnit.SECONDS);
                return;
            }
            try {
                if (session.isDestroyed()) {
                    return;
                }
                if (PullResult.PULL_SUCCESS == pullResult.getCode()) {
                    if (pullResult.getMessageList() != null &&
                            pullResult.getMessageList().size() >= count) {
                        scheduler.schedule(() -> pullMessage(session, subscription, queue), pullIntervalMillis, TimeUnit.MILLISECONDS);
                    }
                    boolean add = session.addSendingMessages(subscription, queue, pullResult.getMessageList());
                    if (add) {
                        pushAction.messageArrive(session, subscription, queue);
                    }
                } else if (PullResult.PULL_OFFSET_MOVED == pullResult.getCode()) {
                    queueOffset.setOffset(pullResult.getNextQueueOffset().getOffset());
                    queueOffset.setOffset(pullResult.getNextQueueOffset().getOffset());
                    session.markPersistOffsetFlag(true);
                    pullMessage(session, subscription, queue);
                } else {
                    logger.error("response:{},{}", session.getClientId(), JSONObject.toJSONString(pullResult));
                }
            } finally {
                clearPullStatus(session, queue, pullEvent);
            }
        });

        PullResultStatus pullResultStatus = queueCache.pullMessage(session, subscription, queue, queueOffset, count, result);
        if (PullResultStatus.LATER.equals(pullResultStatus)) {
            clearPullStatus(session, queue, pullEvent);
            scheduler.schedule(() -> pullMessage(session, subscription, queue), pullIntervalMillis, TimeUnit.MILLISECONDS);
        }
    }