in mqtt-cs/src/main/java/org/apache/rocketmq/mqtt/cs/session/loop/QueueCache.java [273:324]
private void loadCache(boolean isFirst, String firstTopic, Queue queue, QueueOffset queueOffset, int count,
QueueEvent event) {
loadStatus.put(queue, true);
CacheEntry cacheEntry = cache.getIfPresent(queue);
if (cacheEntry == null) {
cacheEntry = new CacheEntry();
cache.put(queue, cacheEntry);
}
cacheEntry.startLoad();
CacheEntry finalCacheEntry = cacheEntry;
CompletableFuture<PullResult> result;
if (isFirst) {
result = lmqQueueStore.pullLastMessages(firstTopic, queue, count);
} else {
result = lmqQueueStore.pullMessage(firstTopic, queue, queueOffset, count);
}
result.whenComplete((pullResult, throwable) -> {
if (throwable != null) {
logger.error("", throwable);
loadEvent.remove(queue, event);
loadStatus.remove(queue);
finalCacheEntry.endLoad();
addLoadEvent(queue, event.session);
return;
}
try {
if (pullResult != null && !CollectionUtils.isEmpty(pullResult.getMessageList())) {
synchronized (finalCacheEntry) {
finalCacheEntry.messageList.addAll(pullResult.getMessageList());
if (isFirst) {
Collections.sort(finalCacheEntry.messageList, Comparator.comparingLong(Message::getOffset));
}
int overNum = finalCacheEntry.messageList.size() - connectConf.getQueueCacheSize();
for (int i = 0; i < overNum; i++) {
finalCacheEntry.messageList.remove(0);
}
}
if (pullResult.getMessageList().size() >= count && !isFirst) {
addLoadEvent(queue, event.session);
return;
}
}
} catch (Exception e) {
logger.error("loadCache failed {}", queue.getQueueName(), e);
addLoadEvent(queue, event.session);
} finally {
loadEvent.remove(queue, event);
loadStatus.remove(queue);
finalCacheEntry.endLoad();
}
});
}