public boolean addSendingMessages()

in mqtt-cs/src/main/java/org/apache/rocketmq/mqtt/cs/session/Session.java [280:321]


    public boolean addSendingMessages(Subscription subscription, Queue queue, List<Message> messages) {
        if (subscription == null) {
            throw new RuntimeException("subscription is null");
        }
        if (queue == null) {
            throw new RuntimeException("queue is null");
        }
        if (messages == null || messages.isEmpty()) {
            return false;
        }
        if (!subscriptions.containsKey(subscription.getTopicFilter())) {
            return false;
        }
        if (!sendingMessages.containsKey(subscription)) {
            sendingMessages.putIfAbsent(subscription, new ConcurrentHashMap<>(16));
        }
        if (!sendingMessages.get(subscription).containsKey(queue)) {
            sendingMessages.get(subscription).putIfAbsent(queue, new LinkedHashSet<>(8));
        }
        String queueName = subscription.toQueueName();
        Map<Queue, QueueOffset> queueOffsetMap = offsetMap.get(queueName);
        if (queueOffsetMap == null || !queueOffsetMap.containsKey(queue)) {
            logger.warn("not found queueOffset,{},{},{}", getClientId(), subscription, queue);
            return false;
        }
        boolean add = false;
        QueueOffset queueOffset;
        queueOffset = queueOffsetMap.get(queue);
        Iterator<Message> iterator = messages.iterator();
        while (iterator.hasNext()) {
            Message message = iterator.next();
            if (message.getOffset() < queueOffset.getOffset() && queueOffset.getOffset() != Long.MAX_VALUE) {
                continue;
            }
            synchronized (this) {
                if (sendingMessages.get(subscription).get(queue).add(message.copy())) {
                    add = true;
                }
            }
        }
        return add;
    }