private void saveRetryQueue()

in mqtt-cs/src/main/java/org/apache/rocketmq/mqtt/cs/session/infly/RetryDriver.java [133:171]


    private void saveRetryQueue(String key, RetryMessage retryMessage) {
        Message message = retryMessage.message.copy();
        message.setFirstTopic(lmqQueueStore.getClientRetryTopic());
        Session session = retryMessage.session;
        int mqttMsgId = retryMessage.mqttMsgId;
        String clientId = session.getClientId();
        if (message.getRetry() >= connectConf.getMaxRetryTime()) {
            pushAction.rollNext(session, retryMessage.mqttMsgId);
            return;
        }
        String retryQueue = Subscription.newRetrySubscription(clientId).toQueueName();
        CompletableFuture<StoreResult> result = lmqQueueStore.putMessage(new HashSet<>(Arrays.asList(retryQueue)), message);
        result.whenComplete((storeResult, throwable) -> {
            if (throwable != null) {
                retryCache.put(key, retryMessage);
                return;
            }
            long queueId = storeResult.getQueue().getQueueId();
            String brokerName = storeResult.getQueue().getBrokerName();
            pushAction.rollNext(session, mqttMsgId);
            scheduler.schedule(() -> {
                Subscription subscription = Subscription.newRetrySubscription(clientId);
                List<Session> sessionList = sessionLoop.getSessionList(clientId);
                if (sessionList != null) {
                    for (Session eachSession : sessionList) {
                        Set<Queue> set = queueFresh.freshQueue(eachSession, subscription);
                        if (set == null || set.isEmpty()) {
                            continue;
                        }
                        for (Queue queue : set) {
                            if (Objects.equals(queue.getBrokerName(), brokerName)) {
                                sessionLoop.notifyPullMessage(eachSession, subscription, queue);
                            }
                        }
                    }
                }
            }, scheduleDelaySecs, TimeUnit.SECONDS);
        });
    }