in mqtt-cs/src/main/java/org/apache/rocketmq/mqtt/cs/protocol/mqtt/handler/MqttSubscribeHandler.java [146:184]
private void sendRetainMessage(ChannelHandlerContext ctx, Set<Subscription> subscriptions) throws InterruptedException, RemotingException, org.apache.rocketmq.remoting.exception.RemotingException {
String clientId = ChannelInfo.getClientId(ctx.channel());
Session session = sessionLoop.getSession(ChannelInfo.getId(ctx.channel()));
Set<Subscription> preciseTopics = new HashSet<>();
Set<Subscription> wildcardTopics = new HashSet<>();
for (Subscription subscription : subscriptions) {
if (!TopicUtils.isWildCard(subscription.getTopicFilter())) {
preciseTopics.add(subscription);
} else {
wildcardTopics.add(subscription);
}
}
for (Subscription subscription : preciseTopics) {
CompletableFuture<Message> retainedMessage = retainedPersistManager.getRetainedMessage(subscription.getTopicFilter());
retainedMessage.whenComplete((msg, throwable) -> {
if (msg == null) {
return;
}
pushAction._sendMessage(session, clientId, subscription, msg);
});
}
for (Subscription subscription : wildcardTopics) {
CompletableFuture<ArrayList<Message>> future = retainedPersistManager.getMsgsFromTrie(subscription);
future.whenComplete((msgsList, throwable) -> {
for (Message msg : msgsList) {
if (msg == null) {
return;
}
pushAction._sendMessage(session, clientId, subscription, msg);
}
});
}
}