in mqtt-cs/src/main/java/org/apache/rocketmq/mqtt/cs/session/Session.java [425:461]
public void ack(Subscription subscription, Queue pendingQueue, long pendingDownSeqId) {
if (subscription == null) {
throw new RuntimeException("subscription is null");
}
if (pendingQueue == null) {
throw new RuntimeException("queue is null");
}
Map<Queue, LinkedHashSet<Message>> queueSendingMsgs = sendingMessages.get(subscription);
if (queueSendingMsgs == null || queueSendingMsgs.isEmpty()) {
return;
}
LinkedHashSet<Message> messages = queueSendingMsgs.get(pendingQueue);
if (messages == null) {
return;
}
synchronized (this) {
if (messages.isEmpty()) {
return;
}
boolean flag = true;
Iterator<Message> iterator = messages.iterator();
while (iterator.hasNext()) {
Message message = iterator.next();
if (message.getOffset() == pendingDownSeqId) {
message.setAck(1);
}
if (message.getAck() != 1) {
flag = false;
}
if (flag) {
updateQueueOffset(subscription, pendingQueue, message);
this.markPersistOffsetFlag(true);
iterator.remove();
}
}
}
}