in mqtt-cs/src/main/java/org/apache/rocketmq/mqtt/cs/session/Session.java [280:321]
public boolean addSendingMessages(Subscription subscription, Queue queue, List<Message> messages) {
if (subscription == null) {
throw new RuntimeException("subscription is null");
}
if (queue == null) {
throw new RuntimeException("queue is null");
}
if (messages == null || messages.isEmpty()) {
return false;
}
if (!subscriptions.containsKey(subscription.getTopicFilter())) {
return false;
}
if (!sendingMessages.containsKey(subscription)) {
sendingMessages.putIfAbsent(subscription, new ConcurrentHashMap<>(16));
}
if (!sendingMessages.get(subscription).containsKey(queue)) {
sendingMessages.get(subscription).putIfAbsent(queue, new LinkedHashSet<>(8));
}
String queueName = subscription.toQueueName();
Map<Queue, QueueOffset> queueOffsetMap = offsetMap.get(queueName);
if (queueOffsetMap == null || !queueOffsetMap.containsKey(queue)) {
logger.warn("not found queueOffset,{},{},{}", getClientId(), subscription, queue);
return false;
}
boolean add = false;
QueueOffset queueOffset;
queueOffset = queueOffsetMap.get(queue);
Iterator<Message> iterator = messages.iterator();
while (iterator.hasNext()) {
Message message = iterator.next();
if (message.getOffset() < queueOffset.getOffset() && queueOffset.getOffset() != Long.MAX_VALUE) {
continue;
}
synchronized (this) {
if (sendingMessages.get(subscription).get(queue).add(message.copy())) {
add = true;
}
}
}
return add;
}