in mqtt-cs/src/main/java/org/apache/rocketmq/mqtt/cs/channel/DefaultChannelManager.java [147:209]
public void closeConnect(Channel channel, ChannelCloseFrom from, String reason) {
String clientId = ChannelInfo.getClientId(channel);
String channelId = ChannelInfo.getId(channel);
String ip = IpUtil.getLocalAddressCompatible();
String willKey = ip + Constants.CTRL_1 + clientId;
CompletableFuture<byte[]> willMessageFuture = willMsgPersistManager.get(willKey);
willMessageFuture.whenComplete((willMessageByte, throwable) -> {
String content = new String(willMessageByte);
if (Constants.NOT_FOUND.equals(content)) {
return;
}
if (!"disconnect".equals(reason)) {
WillMessage willMessage = JSON.parseObject(content, WillMessage.class);
int mqttId = mqttMsgId.nextId(clientId);
MqttPublishMessage mqttMessage = MessageUtil.toMqttMessage(willMessage.getWillTopic(), willMessage.getBody(),
willMessage.getQos(), mqttId, willMessage.isRetain());
Runnable runnable = new Runnable() {
@Override
public void run() {
CompletableFuture<HookResult> upstreamHookResult = publishProcessor.process(buildMqttMessageUpContext(channel), mqttMessage);
upstreamHookResult.whenComplete((hookResult, tb) -> {
try {
if (!hookResult.isSuccess()) {
executor.submit(this);
} else {
willMsgPersistManager.delete(willKey).whenComplete((resultDel, tbDel) -> {
if (!resultDel || tbDel != null) {
logger.error("fail to delete will message key:{}", willKey);
return;
}
logger.info("connection close and send will, delete will message key {} successfully", willKey);
});
}
} catch (Throwable t) {
logger.error("", t);
}
});
}
};
executor.submit(runnable);
}
});
if (clientId == null) {
channelMap.remove(channelId);
sessionLoop.unloadSession(clientId, channelId);
} else {
//session maybe null
Session session = sessionLoop.unloadSession(clientId, channelId);
retryDriver.unloadSession(session);
channelMap.remove(channelId);
ChannelInfo.clear(channel);
}
if (channel.isActive()) {
channel.close();
}
logger.info("Close Connect of channel {} from {} by reason of {}", channel, from, reason);
}