public Message rollNext()

in mqtt-cs/src/main/java/org/apache/rocketmq/mqtt/cs/session/Session.java [323:356]


    public Message rollNext(Subscription subscription, Queue pendingQueue, long pendingDownSeqId) {
        if (subscription == null) {
            throw new RuntimeException("subscription is null");
        }
        if (pendingQueue == null) {
            throw new RuntimeException("queue is null");
        }
        Map<Queue, LinkedHashSet<Message>> queueSendingMsgs = sendingMessages.get(subscription);
        if (queueSendingMsgs == null || queueSendingMsgs.isEmpty()) {
            return null;
        }
        LinkedHashSet<Message> messages = queueSendingMsgs.get(pendingQueue);
        if (messages == null) {
            return null;
        }
        Message message;
        Message nextMessage = null;
        synchronized (this) {
            if (messages.isEmpty()) {
                return null;
            }
            message = messages.iterator().next();
            if (message.getOffset() != pendingDownSeqId) {
                return null;
            }
            messages.remove(message);
            updateQueueOffset(subscription, pendingQueue, message);
            this.markPersistOffsetFlag(true);
            if (!messages.isEmpty()) {
                nextMessage = messages.iterator().next();
            }
        }
        return nextMessage;
    }