in mqtt-cs/src/main/java/org/apache/rocketmq/mqtt/cs/protocol/mqtt/handler/MqttConnectHandler.java [70:126]
public void doHandler(ChannelHandlerContext ctx, MqttConnectMessage connectMessage, HookResult upstreamHookResult) {
final MqttConnectVariableHeader variableHeader = connectMessage.variableHeader();
final MqttConnectPayload payload = connectMessage.payload();
Channel channel = ctx.channel();
ChannelInfo.setKeepLive(channel, variableHeader.keepAliveTimeSeconds());
ChannelInfo.setClientId(channel, connectMessage.payload().clientIdentifier());
ChannelInfo.setCleanSessionFlag(channel, variableHeader.isCleanSession());
String remark = upstreamHookResult.getRemark();
if (!upstreamHookResult.isSuccess()) {
byte connAckCode = (byte) upstreamHookResult.getSubCode();
MqttConnectReturnCode mqttConnectReturnCode = MqttConnectReturnCode.valueOf(connAckCode);
channel.writeAndFlush(MqttMessageFactory.buildConnAckMessage(mqttConnectReturnCode));
channelManager.closeConnect(channel, ChannelCloseFrom.SERVER, remark);
return;
}
CompletableFuture<Void> future = new CompletableFuture<>();
ChannelInfo.setFuture(channel, ChannelInfo.FUTURE_CONNECT, future);
// use 'scheduler' to separate two i/o: 'ack to client' and 'session-load from rocketmq'
scheduler.schedule(() -> {
if (!future.isDone()) {
future.complete(null);
}
}, 1, TimeUnit.SECONDS);
try {
MqttConnAckMessage mqttConnAckMessage = MqttMessageFactory.buildConnAckMessage(MqttConnectReturnCode.CONNECTION_ACCEPTED);
future.thenAccept(aVoid -> {
if (!channel.isActive()) {
return;
}
ChannelInfo.removeFuture(channel, ChannelInfo.FUTURE_CONNECT);
channel.writeAndFlush(mqttConnAckMessage);
});
sessionLoop.loadSession(ChannelInfo.getClientId(channel), channel);
// save will message
WillMessage willMessage = null;
if (variableHeader.isWillFlag()) {
if (payload.willTopic() == null || payload.willMessageInBytes() == null) {
logger.error("Will message and will topic can not be empty");
channelManager.closeConnect(channel, ChannelCloseFrom.SERVER, "Will message and will topic can not be empty");
return;
}
willMessage = new WillMessage(payload.willTopic(), payload.willMessageInBytes(), variableHeader.isWillRetain(), variableHeader.willQos());
sessionLoop.addWillMessage(channel, willMessage);
}
} catch (Exception e) {
logger.error("Connect:{}", payload.clientIdentifier(), e);
channelManager.closeConnect(channel, ChannelCloseFrom.SERVER, "ConnectException");
}
}