in mqtt-cs/src/main/java/org/apache/rocketmq/mqtt/cs/session/loop/SessionLoopImpl.java [325:358]
private void initOffset(Session session, Subscription subscription, Queue queue, QueueOffset queueOffset,
CompletableFuture<Void> future, AtomicInteger result) {
if (queueOffset.isInitialized()) {
futureDone(future, result);
return;
}
if (queueOffset.isInitializing()) {
return;
}
queueOffset.setInitializing();
CompletableFuture<Long> queryResult = lmqQueueStore.queryQueueMaxOffset(queue);
queryResult.whenComplete((maxOffset, throwable) -> {
if (throwable != null) {
logger.error("queryQueueMaxId onException {}", queue.getQueueName(), throwable);
QueueOffset thisQueueOffset = session.getQueueOffset(subscription, queue);
if (thisQueueOffset != null) {
if (!thisQueueOffset.isInitialized()) {
thisQueueOffset.setOffset(Long.MAX_VALUE);
}
thisQueueOffset.setInitialized();
}
futureDone(future, result);
return;
}
QueueOffset thisQueueOffset = session.getQueueOffset(subscription, queue);
if (thisQueueOffset != null) {
if (!thisQueueOffset.isInitialized()) {
thisQueueOffset.setOffset(maxOffset);
}
thisQueueOffset.setInitialized();
}
futureDone(future, result);
});
}