protected void channelRead0()

in mqtt-cs/src/main/java/org/apache/rocketmq/mqtt/cs/protocol/mqtt/MqttPacketDispatcher.java [95:144]


    protected void channelRead0(ChannelHandlerContext ctx, MqttMessage msg) throws Exception {
        if (!ctx.channel().isActive()) {
            return;
        }
        if (!msg.decoderResult().isSuccess()) {
            throw new ChannelDecodeException(ChannelInfo.getClientId(ctx.channel()) + "," + msg.decoderResult());
        }
        ChannelInfo.touch(ctx.channel());
        boolean preResult = preHandler(ctx, msg);
        if (!preResult) {
            return;
        }
        CompletableFuture<HookResult> upstreamHookResult;
        try {
            if (msg instanceof MqttPublishMessage) {
                ((MqttPublishMessage) msg).retain();
            }
            upstreamHookResult = upstreamHookManager.doUpstreamHook(buildMqttMessageUpContext(ctx), msg);
            if (upstreamHookResult == null) {
                _channelRead0(ctx, msg, null);
                return;
            }
        } catch (Throwable t) {
            logger.error("", t);
            if (msg instanceof MqttPublishMessage) {
                ReferenceCountUtil.release(msg);
            }
            throw new ChannelException(t.getMessage());
        }
        upstreamHookResult.whenComplete((hookResult, throwable) -> {
            if (msg instanceof MqttPublishMessage) {
                ReferenceCountUtil.release(msg);
            }
            if (throwable != null) {
                logger.error("", throwable);
                ctx.fireExceptionCaught(new ChannelException(throwable.getMessage()));
                return;
            }
            if (hookResult == null) {
                ctx.fireExceptionCaught(new ChannelException("UpstreamHook Result Unknown"));
                return;
            }
            try {
                _channelRead0(ctx, msg, hookResult);
            } catch (Throwable t) {
                logger.error("", t);
                ctx.fireExceptionCaught(new ChannelException(t.getMessage()));
            }
        });
    }