private void initOffset()

in mqtt-cs/src/main/java/org/apache/rocketmq/mqtt/cs/session/loop/SessionLoopImpl.java [325:358]


    private void initOffset(Session session, Subscription subscription, Queue queue, QueueOffset queueOffset,
                            CompletableFuture<Void> future, AtomicInteger result) {
        if (queueOffset.isInitialized()) {
            futureDone(future, result);
            return;
        }
        if (queueOffset.isInitializing()) {
            return;
        }
        queueOffset.setInitializing();
        CompletableFuture<Long> queryResult = lmqQueueStore.queryQueueMaxOffset(queue);
        queryResult.whenComplete((maxOffset, throwable) -> {
            if (throwable != null) {
                logger.error("queryQueueMaxId onException {}", queue.getQueueName(), throwable);
                QueueOffset thisQueueOffset = session.getQueueOffset(subscription, queue);
                if (thisQueueOffset != null) {
                    if (!thisQueueOffset.isInitialized()) {
                        thisQueueOffset.setOffset(Long.MAX_VALUE);
                    }
                    thisQueueOffset.setInitialized();
                }
                futureDone(future, result);
                return;
            }
            QueueOffset thisQueueOffset = session.getQueueOffset(subscription, queue);
            if (thisQueueOffset != null) {
                if (!thisQueueOffset.isInitialized()) {
                    thisQueueOffset.setOffset(maxOffset);
                }
                thisQueueOffset.setInitialized();
            }
            futureDone(future, result);
        });
    }