in mqtt-cs/src/main/java/org/apache/rocketmq/mqtt/cs/session/loop/QueueCache.java [181:263]
public PullResultStatus pullMessage(Session session, Subscription subscription, Queue queue,
QueueOffset queueOffset, int count,
CompletableFuture<PullResult> callBackResult) {
if (subscription.isP2p() || subscription.isRetry()) {
StatUtil.addPv("NotPullCache", 1);
collectorPullCacheStatus("NotPullCache");
CompletableFuture<PullResult> pullResult = lmqQueueStore.pullMessage(toFirstTopic(subscription), queue, queueOffset, count);
callbackResult(pullResult, callBackResult);
return DONE;
}
CacheEntry cacheEntry = cache.getIfPresent(queue);
if (cacheEntry == null) {
StatUtil.addPv("NoPullCache", 1);
collectorPullCacheStatus("NotPullCache");
CompletableFuture<PullResult> pullResult = lmqQueueStore.pullMessage(toFirstTopic(subscription), queue, queueOffset, count);
callbackResult(pullResult, callBackResult);
return DONE;
}
if (cacheEntry.loading.get()) {
if (System.currentTimeMillis() - cacheEntry.startLoadingT > 1000) {
StatUtil.addPv("LoadPullCacheTimeout", 1);
collectorPullCacheStatus("LoadPullCacheTimeout");
CompletableFuture<PullResult> pullResult = lmqQueueStore.pullMessage(toFirstTopic(subscription), queue, queueOffset, count);
callbackResult(pullResult, callBackResult);
return DONE;
}
return LATER;
}
List<Message> cacheMsgList = cacheEntry.messageList;
if (cacheMsgList.isEmpty()) {
if (loadEvent.get(queue) != null) {
collectorPullCacheStatus("EmptyPullCacheLATER");
StatUtil.addPv("EmptyPullCacheLATER", 1);
return LATER;
}
StatUtil.addPv("EmptyPullCache", 1);
collectorPullCacheStatus("EmptyPullCache");
CompletableFuture<PullResult> pullResult = lmqQueueStore.pullMessage(toFirstTopic(subscription), queue, queueOffset, count);
callbackResult(pullResult, callBackResult);
return DONE;
}
if (queueOffset.getOffset() < cacheMsgList.get(0).getOffset()) {
StatUtil.addPv("OutPullCache", 1);
collectorPullCacheStatus("OutPullCache");
CompletableFuture<PullResult> pullResult = lmqQueueStore.pullMessage(toFirstTopic(subscription), queue, queueOffset, count);
callbackResult(pullResult, callBackResult);
return DONE;
}
List<Message> resultMsgs = new ArrayList<>();
synchronized (cacheEntry) {
for (Message message : cacheMsgList) {
if (message.getOffset() >= queueOffset.getOffset()) {
resultMsgs.add(message);
}
if (resultMsgs.size() >= count) {
break;
}
}
}
if (resultMsgs.isEmpty()) {
if (loadEvent.get(queue) != null) {
StatUtil.addPv("PullCacheLATER", 1);
collectorPullCacheStatus("PullCacheLATER");
return LATER;
}
StatUtil.addPv("OutPullCache2", 1);
collectorPullCacheStatus("OutPullCache2");
CompletableFuture<PullResult> pullResult = lmqQueueStore.pullMessage(toFirstTopic(subscription), queue, queueOffset, count);
callbackResult(pullResult, callBackResult);
return DONE;
}
PullResult pullResult = new PullResult();
pullResult.setMessageList(resultMsgs);
callBackResult.complete(pullResult);
StatUtil.addPv("PullFromCache", 1);
collectorPullCacheStatus("PullFromCache");
if (loadEvent.get(queue) != null) {
return LATER;
}
return DONE;
}