public void doHandler()

in mqtt-cs/src/main/java/org/apache/rocketmq/mqtt/cs/protocol/mqtt/handler/MqttConnectHandler.java [70:126]


    public void doHandler(ChannelHandlerContext ctx, MqttConnectMessage connectMessage, HookResult upstreamHookResult) {
        final MqttConnectVariableHeader variableHeader = connectMessage.variableHeader();
        final MqttConnectPayload payload = connectMessage.payload();

        Channel channel = ctx.channel();
        ChannelInfo.setKeepLive(channel, variableHeader.keepAliveTimeSeconds());
        ChannelInfo.setClientId(channel, connectMessage.payload().clientIdentifier());
        ChannelInfo.setCleanSessionFlag(channel, variableHeader.isCleanSession());

        String remark = upstreamHookResult.getRemark();
        if (!upstreamHookResult.isSuccess()) {
            byte connAckCode = (byte) upstreamHookResult.getSubCode();
            MqttConnectReturnCode mqttConnectReturnCode = MqttConnectReturnCode.valueOf(connAckCode);
            channel.writeAndFlush(MqttMessageFactory.buildConnAckMessage(mqttConnectReturnCode));
            channelManager.closeConnect(channel, ChannelCloseFrom.SERVER, remark);
            return;
        }

        CompletableFuture<Void> future = new CompletableFuture<>();
        ChannelInfo.setFuture(channel, ChannelInfo.FUTURE_CONNECT, future);

        // use 'scheduler' to separate two i/o: 'ack to client' and 'session-load from rocketmq'
        scheduler.schedule(() -> {
            if (!future.isDone()) {
                future.complete(null);
            }
        }, 1, TimeUnit.SECONDS);

        try {
            MqttConnAckMessage mqttConnAckMessage = MqttMessageFactory.buildConnAckMessage(MqttConnectReturnCode.CONNECTION_ACCEPTED);
            future.thenAccept(aVoid -> {
                if (!channel.isActive()) {
                    return;
                }
                ChannelInfo.removeFuture(channel, ChannelInfo.FUTURE_CONNECT);
                channel.writeAndFlush(mqttConnAckMessage);
            });
            sessionLoop.loadSession(ChannelInfo.getClientId(channel), channel);

            // save will message
            WillMessage willMessage = null;
            if (variableHeader.isWillFlag()) {
                if (payload.willTopic() == null || payload.willMessageInBytes() == null) {
                    logger.error("Will message and will topic can not be empty");
                    channelManager.closeConnect(channel, ChannelCloseFrom.SERVER, "Will message and will topic can not be empty");
                    return;
                }

                willMessage = new WillMessage(payload.willTopic(), payload.willMessageInBytes(), variableHeader.isWillRetain(), variableHeader.willQos());
                sessionLoop.addWillMessage(channel, willMessage);
            }

        } catch (Exception e) {
            logger.error("Connect:{}", payload.clientIdentifier(), e);
            channelManager.closeConnect(channel, ChannelCloseFrom.SERVER, "ConnectException");
        }
    }