public PullResultStatus pullMessage()

in mqtt-cs/src/main/java/org/apache/rocketmq/mqtt/cs/session/loop/QueueCache.java [181:263]


    public PullResultStatus pullMessage(Session session, Subscription subscription, Queue queue,
                                        QueueOffset queueOffset, int count,
                                        CompletableFuture<PullResult> callBackResult) {
        if (subscription.isP2p() || subscription.isRetry()) {
            StatUtil.addPv("NotPullCache", 1);
            collectorPullCacheStatus("NotPullCache");
            CompletableFuture<PullResult> pullResult = lmqQueueStore.pullMessage(toFirstTopic(subscription), queue, queueOffset, count);
            callbackResult(pullResult, callBackResult);
            return DONE;
        }
        CacheEntry cacheEntry = cache.getIfPresent(queue);
        if (cacheEntry == null) {
            StatUtil.addPv("NoPullCache", 1);
            collectorPullCacheStatus("NotPullCache");
            CompletableFuture<PullResult> pullResult = lmqQueueStore.pullMessage(toFirstTopic(subscription), queue, queueOffset, count);
            callbackResult(pullResult, callBackResult);
            return DONE;
        }
        if (cacheEntry.loading.get()) {
            if (System.currentTimeMillis() - cacheEntry.startLoadingT > 1000) {
                StatUtil.addPv("LoadPullCacheTimeout", 1);
                collectorPullCacheStatus("LoadPullCacheTimeout");
                CompletableFuture<PullResult> pullResult = lmqQueueStore.pullMessage(toFirstTopic(subscription), queue, queueOffset, count);
                callbackResult(pullResult, callBackResult);
                return DONE;
            }
            return LATER;
        }

        List<Message> cacheMsgList = cacheEntry.messageList;
        if (cacheMsgList.isEmpty()) {
            if (loadEvent.get(queue) != null) {
                collectorPullCacheStatus("EmptyPullCacheLATER");
                StatUtil.addPv("EmptyPullCacheLATER", 1);
                return LATER;
            }
            StatUtil.addPv("EmptyPullCache", 1);
            collectorPullCacheStatus("EmptyPullCache");
            CompletableFuture<PullResult> pullResult = lmqQueueStore.pullMessage(toFirstTopic(subscription), queue, queueOffset, count);
            callbackResult(pullResult, callBackResult);
            return DONE;
        }

        if (queueOffset.getOffset() < cacheMsgList.get(0).getOffset()) {
            StatUtil.addPv("OutPullCache", 1);
            collectorPullCacheStatus("OutPullCache");
            CompletableFuture<PullResult> pullResult = lmqQueueStore.pullMessage(toFirstTopic(subscription), queue, queueOffset, count);
            callbackResult(pullResult, callBackResult);
            return DONE;
        }
        List<Message> resultMsgs = new ArrayList<>();
        synchronized (cacheEntry) {
            for (Message message : cacheMsgList) {
                if (message.getOffset() >= queueOffset.getOffset()) {
                    resultMsgs.add(message);
                }
                if (resultMsgs.size() >= count) {
                    break;
                }
            }
        }
        if (resultMsgs.isEmpty()) {
            if (loadEvent.get(queue) != null) {
                StatUtil.addPv("PullCacheLATER", 1);
                collectorPullCacheStatus("PullCacheLATER");
                return LATER;
            }
            StatUtil.addPv("OutPullCache2", 1);
            collectorPullCacheStatus("OutPullCache2");
            CompletableFuture<PullResult> pullResult = lmqQueueStore.pullMessage(toFirstTopic(subscription), queue, queueOffset, count);
            callbackResult(pullResult, callBackResult);
            return DONE;
        }
        PullResult pullResult = new PullResult();
        pullResult.setMessageList(resultMsgs);
        callBackResult.complete(pullResult);
        StatUtil.addPv("PullFromCache", 1);
        collectorPullCacheStatus("PullFromCache");
        if (loadEvent.get(queue) != null) {
            return LATER;
        }
        return DONE;
    }