in mqtt-cs/src/main/java/org/apache/rocketmq/mqtt/cs/session/Session.java [398:423]
public List<Message> pendMessageList(Subscription subscription, Queue queue) {
if (subscription == null) {
throw new RuntimeException("subscription is null");
}
if (queue == null) {
throw new RuntimeException("queue is null");
}
List<Message> list = new ArrayList<>();
Map<Queue, LinkedHashSet<Message>> tmp = sendingMessages.get(subscription);
if (tmp != null && !tmp.isEmpty()) {
LinkedHashSet<Message> messages = tmp.get(queue);
if (messages == null) {
return null;
}
synchronized (this) {
if (!messages.isEmpty()) {
for (Message message : messages) {
if (message.getAck() == -1) {
list.add(message);
}
}
}
}
}
return list;
}