in mqtt-cs/src/main/java/org/apache/rocketmq/mqtt/cs/session/Session.java [323:356]
public Message rollNext(Subscription subscription, Queue pendingQueue, long pendingDownSeqId) {
if (subscription == null) {
throw new RuntimeException("subscription is null");
}
if (pendingQueue == null) {
throw new RuntimeException("queue is null");
}
Map<Queue, LinkedHashSet<Message>> queueSendingMsgs = sendingMessages.get(subscription);
if (queueSendingMsgs == null || queueSendingMsgs.isEmpty()) {
return null;
}
LinkedHashSet<Message> messages = queueSendingMsgs.get(pendingQueue);
if (messages == null) {
return null;
}
Message message;
Message nextMessage = null;
synchronized (this) {
if (messages.isEmpty()) {
return null;
}
message = messages.iterator().next();
if (message.getOffset() != pendingDownSeqId) {
return null;
}
messages.remove(message);
updateQueueOffset(subscription, pendingQueue, message);
this.markPersistOffsetFlag(true);
if (!messages.isEmpty()) {
nextMessage = messages.iterator().next();
}
}
return nextMessage;
}