in mqtt-cs/src/main/java/org/apache/rocketmq/mqtt/cs/protocol/mqtt/MqttPacketDispatcher.java [95:144]
protected void channelRead0(ChannelHandlerContext ctx, MqttMessage msg) throws Exception {
if (!ctx.channel().isActive()) {
return;
}
if (!msg.decoderResult().isSuccess()) {
throw new ChannelDecodeException(ChannelInfo.getClientId(ctx.channel()) + "," + msg.decoderResult());
}
ChannelInfo.touch(ctx.channel());
boolean preResult = preHandler(ctx, msg);
if (!preResult) {
return;
}
CompletableFuture<HookResult> upstreamHookResult;
try {
if (msg instanceof MqttPublishMessage) {
((MqttPublishMessage) msg).retain();
}
upstreamHookResult = upstreamHookManager.doUpstreamHook(buildMqttMessageUpContext(ctx), msg);
if (upstreamHookResult == null) {
_channelRead0(ctx, msg, null);
return;
}
} catch (Throwable t) {
logger.error("", t);
if (msg instanceof MqttPublishMessage) {
ReferenceCountUtil.release(msg);
}
throw new ChannelException(t.getMessage());
}
upstreamHookResult.whenComplete((hookResult, throwable) -> {
if (msg instanceof MqttPublishMessage) {
ReferenceCountUtil.release(msg);
}
if (throwable != null) {
logger.error("", throwable);
ctx.fireExceptionCaught(new ChannelException(throwable.getMessage()));
return;
}
if (hookResult == null) {
ctx.fireExceptionCaught(new ChannelException("UpstreamHook Result Unknown"));
return;
}
try {
_channelRead0(ctx, msg, hookResult);
} catch (Throwable t) {
logger.error("", t);
ctx.fireExceptionCaught(new ChannelException(t.getMessage()));
}
});
}