in mqtt-cs/src/main/java/org/apache/rocketmq/mqtt/cs/session/loop/WillLoop.java [116:185]
private void masterLoop() {
try {
String ip = IpUtil.getLocalAddressCompatible();
if (ip == null) {
logger.error("can not get local ip");
return;
}
willMsgPersistManager.get(Constants.CS_MASTER).whenComplete((result, throwable) -> {
if (result == null || throwable != null) {
logger.error("fail to get CS_MASTER", throwable);
return;
}
String content = new String(result);
if (Constants.NOT_FOUND.equals(content)) {
// no master
return;
}
if (!content.startsWith(ip)) {
// is not master
return;
}
// master keep alive
long currentTime = System.currentTimeMillis();
willMsgPersistManager.compareAndPut(Constants.CS_MASTER, content, ip + Constants.COLON + currentTime).whenComplete((rs, tb) -> {
if (!rs || tb != null) {
logger.error("{} fail to update master", ip, tb);
}
});
// master to check all cs state
String startCSKey = Constants.CS_ALIVE + Constants.CTRL_0;
String endCSKey = Constants.CS_ALIVE + Constants.CTRL_2;
willMsgPersistManager.scan(startCSKey, endCSKey).whenComplete((rs, tb) -> {
if (rs == null || tb != null) {
logger.error("{} master fail to scan cs", ip, tb);
return;
}
if (rs.size() == 0) {
logger.info("master scanned 0 cs");
return;
}
for (Map.Entry<String, String> entry : rs.entrySet()) {
String key = entry.getKey();
String value = entry.getValue();
logger.info("master:{} scan cs:{}, heart:{} {}", ip, key, value, new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(Long.parseLong(value)));
if (System.currentTimeMillis() - Long.parseLong(value) > 10 * checkAliveIntervalMillis) {
// the cs has down
String csIp = key.substring((Constants.CS_ALIVE + Constants.CTRL_1).length());
handleShutDownCS(csIp);
willMsgPersistManager.delete(key).whenComplete((resultDel, tbDel) -> {
if (!resultDel || tbDel != null) {
logger.error("fail to delete shutDown cs:{} in db", key);
return;
}
logger.debug("delete shutDown cs:{} in db successfully", key);
});
}
}
});
});
} catch (Exception e) {
logger.error("", e);
}
}