private void handleShutDownCS()

in mqtt-cs/src/main/java/org/apache/rocketmq/mqtt/cs/session/loop/WillLoop.java [187:236]


    private void handleShutDownCS(String ip) {
        String startClientKey = ip + Constants.CTRL_0;
        String endClientKey = ip + Constants.CTRL_2;
        willMsgPersistManager.scan(startClientKey, endClientKey).whenComplete((willMap, throwable) -> {
            if (willMap == null || throwable != null) {
                logger.error("{} master fail to scan cs", ip, throwable);
                return;
            }

            if (willMap.size() == 0) {
                logger.info("the cs:{} has no will", ip);
                return;
            }

            for (Map.Entry<String, String> entry : willMap.entrySet()) {
                logger.info("master handle will {} {}", entry.getKey(), entry.getValue());
                String willKey = entry.getKey();
                String clientId = entry.getKey().substring((ip + Constants.CTRL_1).length());

                WillMessage willMessage = JSON.parseObject(entry.getValue(), WillMessage.class);
                int mqttId = mqttMsgId.nextId(clientId);
                MqttPublishMessage mqttMessage = MessageUtil.toMqttMessage(willMessage.getWillTopic(), willMessage.getBody(),
                        willMessage.getQos(), mqttId, willMessage.isRetain());
                Runnable runnable = new Runnable() {
                    @Override
                    public void run() {
                        CompletableFuture<HookResult> upstreamHookResult = publishProcessor.process(new MqttMessageUpContext(), mqttMessage);
                        upstreamHookResult.whenComplete((hookResult, tb) -> {
                            try {
                                if (!hookResult.isSuccess()) {
                                    executor.submit(this);
                                } else {
                                    willMsgPersistManager.delete(willKey).whenComplete((resultDel, tbDel) -> {
                                        if (!resultDel || tbDel != null) {
                                            logger.error("fail to delete will message key:{}", willKey);
                                            return;
                                        }
                                        logger.info("delete will message key {} successfully", willKey);
                                    });
                                }
                            } catch (Throwable t) {
                                logger.error("", t);
                            }
                        });
                    }
                };
                executor.submit(runnable);
            }
        });
    }