in coordinator/src/main/java/org/apache/uniffle/coordinator/SimpleClusterManager.java [134:175]
void nodesCheck() {
try {
long timestamp = System.currentTimeMillis();
for (ServerNode sn : servers.values()) {
if (timestamp - sn.getTimestamp() > heartbeatTimeout) {
LOG.warn("Heartbeat timeout detect, {} will be removed from node list.", sn);
sn.setStatus(ServerStatus.LOST);
lostNodes.add(sn);
unhealthyNodes.remove(sn);
} else if (ServerStatus.UNHEALTHY.equals(sn.getStatus())) {
LOG.warn("Found server {} was unhealthy, will not assign it.", sn);
unhealthyNodes.add(sn);
lostNodes.remove(sn);
} else {
sn.setStatus(ServerStatus.ACTIVE);
lostNodes.remove(sn);
unhealthyNodes.remove(sn);
}
}
for (ServerNode server : lostNodes) {
ServerNode sn = servers.remove(server.getId());
if (sn != null) {
clientCache.invalidate(sn);
for (Set<ServerNode> nodesWithTag : tagToNodes.values()) {
nodesWithTag.remove(sn);
}
}
}
if (!lostNodes.isEmpty() || outputAliveServerCount % periodicOutputIntervalTimes == 0) {
LOG.info(
"Alive servers number: {}, ids: {}",
servers.size(),
servers.keySet().stream().collect(Collectors.toList()));
}
outputAliveServerCount++;
CoordinatorMetrics.gaugeUnhealthyServerNum.set(unhealthyNodes.size());
CoordinatorMetrics.gaugeTotalServerNum.set(servers.size());
} catch (Exception e) {
LOG.warn("Error happened in nodesCheck", e);
}
}