public void ack()

in mqtt-cs/src/main/java/org/apache/rocketmq/mqtt/cs/session/Session.java [425:461]


    public void ack(Subscription subscription, Queue pendingQueue, long pendingDownSeqId) {
        if (subscription == null) {
            throw new RuntimeException("subscription is null");
        }
        if (pendingQueue == null) {
            throw new RuntimeException("queue is null");
        }
        Map<Queue, LinkedHashSet<Message>> queueSendingMsgs = sendingMessages.get(subscription);
        if (queueSendingMsgs == null || queueSendingMsgs.isEmpty()) {
            return;
        }
        LinkedHashSet<Message> messages = queueSendingMsgs.get(pendingQueue);
        if (messages == null) {
            return;
        }
        synchronized (this) {
            if (messages.isEmpty()) {
                return;
            }
            boolean flag = true;
            Iterator<Message> iterator = messages.iterator();
            while (iterator.hasNext()) {
                Message message = iterator.next();
                if (message.getOffset() == pendingDownSeqId) {
                    message.setAck(1);
                }
                if (message.getAck() != 1) {
                    flag = false;
                }
                if (flag) {
                    updateQueueOffset(subscription, pendingQueue, message);
                    this.markPersistOffsetFlag(true);
                    iterator.remove();
                }
            }
        }
    }