in mqtt-cs/src/main/java/org/apache/rocketmq/mqtt/cs/session/loop/WillLoop.java [187:236]
private void handleShutDownCS(String ip) {
String startClientKey = ip + Constants.CTRL_0;
String endClientKey = ip + Constants.CTRL_2;
willMsgPersistManager.scan(startClientKey, endClientKey).whenComplete((willMap, throwable) -> {
if (willMap == null || throwable != null) {
logger.error("{} master fail to scan cs", ip, throwable);
return;
}
if (willMap.size() == 0) {
logger.info("the cs:{} has no will", ip);
return;
}
for (Map.Entry<String, String> entry : willMap.entrySet()) {
logger.info("master handle will {} {}", entry.getKey(), entry.getValue());
String willKey = entry.getKey();
String clientId = entry.getKey().substring((ip + Constants.CTRL_1).length());
WillMessage willMessage = JSON.parseObject(entry.getValue(), WillMessage.class);
int mqttId = mqttMsgId.nextId(clientId);
MqttPublishMessage mqttMessage = MessageUtil.toMqttMessage(willMessage.getWillTopic(), willMessage.getBody(),
willMessage.getQos(), mqttId, willMessage.isRetain());
Runnable runnable = new Runnable() {
@Override
public void run() {
CompletableFuture<HookResult> upstreamHookResult = publishProcessor.process(new MqttMessageUpContext(), mqttMessage);
upstreamHookResult.whenComplete((hookResult, tb) -> {
try {
if (!hookResult.isSuccess()) {
executor.submit(this);
} else {
willMsgPersistManager.delete(willKey).whenComplete((resultDel, tbDel) -> {
if (!resultDel || tbDel != null) {
logger.error("fail to delete will message key:{}", willKey);
return;
}
logger.info("delete will message key {} successfully", willKey);
});
}
} catch (Throwable t) {
logger.error("", t);
}
});
}
};
executor.submit(runnable);
}
});
}