in mqtt-cs/src/main/java/org/apache/rocketmq/mqtt/cs/session/loop/WillLoop.java [76:104]
private void csLoop() {
try {
String ip = IpUtil.getLocalAddressCompatible();
String csKey = Constants.CS_ALIVE + Constants.CTRL_1 + ip;
String masterKey = Constants.CS_MASTER;
long currentTime = System.currentTimeMillis();
willMsgPersistManager.put(csKey, String.valueOf(currentTime)).whenComplete((result, throwable) -> {
if (result == null || throwable != null) {
logger.error("{} fail to put csKey", csKey, throwable);
}
});
willMsgPersistManager.get(masterKey).whenComplete((result, throwable) -> {
String content = new String(result);
if (Constants.NOT_FOUND.equals(content) || masterHasDown(content)) {
willMsgPersistManager.compareAndPut(masterKey, content, ip + Constants.COLON + currentTime).whenComplete((rs, tb) -> {
if (!rs || tb != null) {
logger.error("{} fail to update master", ip, tb);
return;
}
logger.info("{} update master successfully", ip);
});
}
});
} catch (Exception e) {
logger.error("", e);
}
}