void nodesCheck()

in coordinator/src/main/java/org/apache/uniffle/coordinator/SimpleClusterManager.java [134:175]


  void nodesCheck() {
    try {
      long timestamp = System.currentTimeMillis();
      for (ServerNode sn : servers.values()) {
        if (timestamp - sn.getTimestamp() > heartbeatTimeout) {
          LOG.warn("Heartbeat timeout detect, {} will be removed from node list.", sn);
          sn.setStatus(ServerStatus.LOST);
          lostNodes.add(sn);
          unhealthyNodes.remove(sn);
        } else if (ServerStatus.UNHEALTHY.equals(sn.getStatus())) {
          LOG.warn("Found server {} was unhealthy, will not assign it.", sn);
          unhealthyNodes.add(sn);
          lostNodes.remove(sn);
        } else {
          sn.setStatus(ServerStatus.ACTIVE);
          lostNodes.remove(sn);
          unhealthyNodes.remove(sn);
        }
      }
      for (ServerNode server : lostNodes) {
        ServerNode sn = servers.remove(server.getId());
        if (sn != null) {
          clientCache.invalidate(sn);
          for (Set<ServerNode> nodesWithTag : tagToNodes.values()) {
            nodesWithTag.remove(sn);
          }
        }
      }
      if (!lostNodes.isEmpty() || outputAliveServerCount % periodicOutputIntervalTimes == 0) {
        LOG.info(
            "Alive servers number: {}, ids: {}",
            servers.size(),
            servers.keySet().stream().collect(Collectors.toList()));
      }
      outputAliveServerCount++;

      CoordinatorMetrics.gaugeUnhealthyServerNum.set(unhealthyNodes.size());
      CoordinatorMetrics.gaugeTotalServerNum.set(servers.size());
    } catch (Exception e) {
      LOG.warn("Error happened in nodesCheck", e);
    }
  }