private void masterLoop()

in mqtt-cs/src/main/java/org/apache/rocketmq/mqtt/cs/session/loop/WillLoop.java [116:185]


    private void masterLoop() {
        try {
            String ip = IpUtil.getLocalAddressCompatible();
            if (ip == null) {
                logger.error("can not get local ip");
                return;
            }

            willMsgPersistManager.get(Constants.CS_MASTER).whenComplete((result, throwable) -> {
                if (result == null || throwable != null) {
                    logger.error("fail to get CS_MASTER", throwable);
                    return;
                }

                String content = new String(result);
                if (Constants.NOT_FOUND.equals(content)) {
                    // no master
                    return;
                }

                if (!content.startsWith(ip)) {
                    // is not master
                    return;
                }
                // master keep alive
                long currentTime = System.currentTimeMillis();
                willMsgPersistManager.compareAndPut(Constants.CS_MASTER, content, ip + Constants.COLON + currentTime).whenComplete((rs, tb) -> {
                    if (!rs || tb != null) {
                        logger.error("{} fail to update master", ip, tb);
                    }
                });

                // master to check all cs state
                String startCSKey = Constants.CS_ALIVE + Constants.CTRL_0;
                String endCSKey = Constants.CS_ALIVE + Constants.CTRL_2;
                willMsgPersistManager.scan(startCSKey, endCSKey).whenComplete((rs, tb) -> {
                    if (rs == null || tb != null) {
                        logger.error("{} master fail to scan cs", ip, tb);
                        return;
                    }

                    if (rs.size() == 0) {
                        logger.info("master scanned 0 cs");
                        return;
                    }

                    for (Map.Entry<String, String> entry : rs.entrySet()) {
                        String key = entry.getKey();
                        String value = entry.getValue();
                        logger.info("master:{} scan cs:{}, heart:{} {}", ip, key, value, new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(Long.parseLong(value)));
                        if (System.currentTimeMillis() - Long.parseLong(value) > 10 * checkAliveIntervalMillis) {
                            // the cs has down
                            String csIp = key.substring((Constants.CS_ALIVE + Constants.CTRL_1).length());
                            handleShutDownCS(csIp);

                            willMsgPersistManager.delete(key).whenComplete((resultDel, tbDel) -> {
                                if (!resultDel || tbDel != null) {
                                    logger.error("fail to delete shutDown cs:{} in db", key);
                                    return;
                                }
                                logger.debug("delete shutDown cs:{} in db successfully", key);
                            });
                        }
                    }
                });
            });
        } catch (Exception e) {
            logger.error("", e);
        }
    }