in mqtt-cs/src/main/java/org/apache/rocketmq/mqtt/cs/session/loop/SessionLoopImpl.java [499:559]
private void doPull(PullEvent pullEvent) {
Session session = pullEvent.session;
Subscription subscription = pullEvent.subscription;
Queue queue = pullEvent.queue;
QueueOffset queueOffset = session.getQueueOffset(subscription, queue);
if (session.isDestroyed() || queueOffset == null) {
clearPullStatus(session, queue, pullEvent);
return;
}
if (!queueOffset.isInitialized()) {
initOffset(session, subscription, queue, queueOffset, null, null);
scheduler.schedule(() -> pullMessage(session, subscription, queue), pullIntervalMillis, TimeUnit.MILLISECONDS);
return;
}
pullStatus.put(eventQueueKey(session, queue), true);
int count = session.getPullSize() > 0 ? session.getPullSize() : connectConf.getPullBatchSize();
CompletableFuture<PullResult> result = new CompletableFuture<>();
result.whenComplete((pullResult, throwable) -> {
if (throwable != null) {
clearPullStatus(session, queue, pullEvent);
logger.error("{}", session.getClientId(), throwable);
if (session.isDestroyed()) {
return;
}
scheduler.schedule(() -> pullMessage(session, subscription, queue), 1, TimeUnit.SECONDS);
return;
}
try {
if (session.isDestroyed()) {
return;
}
if (PullResult.PULL_SUCCESS == pullResult.getCode()) {
if (pullResult.getMessageList() != null &&
pullResult.getMessageList().size() >= count) {
scheduler.schedule(() -> pullMessage(session, subscription, queue), pullIntervalMillis, TimeUnit.MILLISECONDS);
}
boolean add = session.addSendingMessages(subscription, queue, pullResult.getMessageList());
if (add) {
pushAction.messageArrive(session, subscription, queue);
}
} else if (PullResult.PULL_OFFSET_MOVED == pullResult.getCode()) {
queueOffset.setOffset(pullResult.getNextQueueOffset().getOffset());
queueOffset.setOffset(pullResult.getNextQueueOffset().getOffset());
session.markPersistOffsetFlag(true);
pullMessage(session, subscription, queue);
} else {
logger.error("response:{},{}", session.getClientId(), JSONObject.toJSONString(pullResult));
}
} finally {
clearPullStatus(session, queue, pullEvent);
}
});
PullResultStatus pullResultStatus = queueCache.pullMessage(session, subscription, queue, queueOffset, count, result);
if (PullResultStatus.LATER.equals(pullResultStatus)) {
clearPullStatus(session, queue, pullEvent);
scheduler.schedule(() -> pullMessage(session, subscription, queue), pullIntervalMillis, TimeUnit.MILLISECONDS);
}
}