private void loadCache()

in mqtt-cs/src/main/java/org/apache/rocketmq/mqtt/cs/session/loop/QueueCache.java [273:324]


    private void loadCache(boolean isFirst, String firstTopic, Queue queue, QueueOffset queueOffset, int count,
                           QueueEvent event) {
        loadStatus.put(queue, true);
        CacheEntry cacheEntry = cache.getIfPresent(queue);
        if (cacheEntry == null) {
            cacheEntry = new CacheEntry();
            cache.put(queue, cacheEntry);
        }
        cacheEntry.startLoad();
        CacheEntry finalCacheEntry = cacheEntry;
        CompletableFuture<PullResult> result;
        if (isFirst) {
            result = lmqQueueStore.pullLastMessages(firstTopic, queue, count);
        } else {
            result = lmqQueueStore.pullMessage(firstTopic, queue, queueOffset, count);
        }
        result.whenComplete((pullResult, throwable) -> {
            if (throwable != null) {
                logger.error("", throwable);
                loadEvent.remove(queue, event);
                loadStatus.remove(queue);
                finalCacheEntry.endLoad();
                addLoadEvent(queue, event.session);
                return;
            }
            try {
                if (pullResult != null && !CollectionUtils.isEmpty(pullResult.getMessageList())) {
                    synchronized (finalCacheEntry) {
                        finalCacheEntry.messageList.addAll(pullResult.getMessageList());
                        if (isFirst) {
                            Collections.sort(finalCacheEntry.messageList, Comparator.comparingLong(Message::getOffset));
                        }
                        int overNum = finalCacheEntry.messageList.size() - connectConf.getQueueCacheSize();
                        for (int i = 0; i < overNum; i++) {
                            finalCacheEntry.messageList.remove(0);
                        }
                    }
                    if (pullResult.getMessageList().size() >= count && !isFirst) {
                        addLoadEvent(queue, event.session);
                        return;
                    }
                }
            } catch (Exception e) {
                logger.error("loadCache failed {}", queue.getQueueName(), e);
                addLoadEvent(queue, event.session);
            } finally {
                loadEvent.remove(queue, event);
                loadStatus.remove(queue);
                finalCacheEntry.endLoad();
            }
        });
    }