public void closeConnect()

in mqtt-cs/src/main/java/org/apache/rocketmq/mqtt/cs/channel/DefaultChannelManager.java [147:209]


    public void closeConnect(Channel channel, ChannelCloseFrom from, String reason) {
        String clientId = ChannelInfo.getClientId(channel);
        String channelId = ChannelInfo.getId(channel);
        String ip = IpUtil.getLocalAddressCompatible();

        String willKey = ip + Constants.CTRL_1 + clientId;
        CompletableFuture<byte[]> willMessageFuture = willMsgPersistManager.get(willKey);
        willMessageFuture.whenComplete((willMessageByte, throwable) -> {
            String content = new String(willMessageByte);
            if (Constants.NOT_FOUND.equals(content)) {
                return;
            }

            if (!"disconnect".equals(reason)) {
                WillMessage willMessage = JSON.parseObject(content, WillMessage.class);

                int mqttId = mqttMsgId.nextId(clientId);
                MqttPublishMessage mqttMessage = MessageUtil.toMqttMessage(willMessage.getWillTopic(), willMessage.getBody(),
                        willMessage.getQos(), mqttId, willMessage.isRetain());

                Runnable runnable = new Runnable() {
                    @Override
                    public void run() {
                        CompletableFuture<HookResult> upstreamHookResult = publishProcessor.process(buildMqttMessageUpContext(channel), mqttMessage);
                        upstreamHookResult.whenComplete((hookResult, tb) -> {
                            try {
                                if (!hookResult.isSuccess()) {
                                    executor.submit(this);
                                } else {
                                    willMsgPersistManager.delete(willKey).whenComplete((resultDel, tbDel) -> {
                                        if (!resultDel || tbDel != null) {
                                            logger.error("fail to delete will message key:{}", willKey);
                                            return;
                                        }
                                        logger.info("connection close and send will, delete will message key {} successfully", willKey);
                                    });
                                }
                            } catch (Throwable t) {
                                logger.error("", t);
                            }
                        });
                    }
                };
                executor.submit(runnable);
            }
        });

        if (clientId == null) {
            channelMap.remove(channelId);
            sessionLoop.unloadSession(clientId, channelId);
        } else {
            //session maybe null
            Session session = sessionLoop.unloadSession(clientId, channelId);
            retryDriver.unloadSession(session);
            channelMap.remove(channelId);
            ChannelInfo.clear(channel);
        }

        if (channel.isActive()) {
            channel.close();
        }
        logger.info("Close Connect of channel {} from {} by reason of {}", channel, from, reason);
    }