private void sendRetainMessage()

in mqtt-cs/src/main/java/org/apache/rocketmq/mqtt/cs/protocol/mqtt/handler/MqttSubscribeHandler.java [146:184]


    private void sendRetainMessage(ChannelHandlerContext ctx, Set<Subscription> subscriptions) throws InterruptedException, RemotingException, org.apache.rocketmq.remoting.exception.RemotingException {

        String clientId = ChannelInfo.getClientId(ctx.channel());
        Session session = sessionLoop.getSession(ChannelInfo.getId(ctx.channel()));
        Set<Subscription> preciseTopics = new HashSet<>();
        Set<Subscription> wildcardTopics = new HashSet<>();

        for (Subscription subscription : subscriptions) {
            if (!TopicUtils.isWildCard(subscription.getTopicFilter())) {
                preciseTopics.add(subscription);
            } else {
                wildcardTopics.add(subscription);
            }
        }

        for (Subscription subscription : preciseTopics) {
            CompletableFuture<Message> retainedMessage = retainedPersistManager.getRetainedMessage(subscription.getTopicFilter());
            retainedMessage.whenComplete((msg, throwable) -> {
                if (msg == null) {
                    return;
                }
                pushAction._sendMessage(session, clientId, subscription, msg);
            });
        }

        for (Subscription subscription : wildcardTopics) {

            CompletableFuture<ArrayList<Message>> future = retainedPersistManager.getMsgsFromTrie(subscription);
            future.whenComplete((msgsList, throwable) -> {
                for (Message msg : msgsList) {
                    if (msg == null) {
                        return;
                    }
                    pushAction._sendMessage(session, clientId, subscription, msg);
                }
            });

        }
    }