in master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/AbstractMetaManager.java [246:306]
public void updateWorkerHeartbeatMeta(
String host,
int rpcPort,
int pushPort,
int fetchPort,
int replicatePort,
Map<String, DiskInfo> disks,
long time,
WorkerStatus workerStatus,
boolean highWorkload) {
WorkerInfo worker =
new WorkerInfo(host, rpcPort, pushPort, fetchPort, replicatePort, -1, disks, null);
AtomicLong availableSlots = new AtomicLong();
LOG.debug("update worker {}:{} heartbeat {}", host, rpcPort, disks);
synchronized (workersMap) {
Optional<WorkerInfo> workerInfo = Optional.ofNullable(workersMap.get(worker.toUniqueId()));
workerInfo.ifPresent(
info -> {
info.updateThenGetDiskInfos(disks, Option.apply(estimatedPartitionSize));
availableSlots.set(info.totalAvailableSlots());
info.lastHeartbeat_$eq(time);
info.setWorkerStatus(workerStatus);
});
}
WorkerEventInfo workerEventInfo = workerEventInfos.get(worker);
if (workerEventInfo != null
&& WorkerStatusUtils.meetFinalState(workerEventInfo, workerStatus)) {
workerEventInfos.remove(worker);
if (workerStatus.getState() == PbWorkerStatus.State.Normal) {
shutdownWorkers.remove(worker);
}
}
// If using HDFSONLY mode, workers with empty disks should not be put into excluded worker list.
long unhealthyDiskNum =
disks.values().stream().filter(s -> !s.status().equals(DiskStatus.HEALTHY)).count();
boolean exceed = unhealthyDiskNum * 1.0 / disks.size() >= unhealthyDiskRatioThreshold;
if (!excludedWorkers.contains(worker)
&& (((disks.isEmpty() || exceed)
&& !conf.hasHDFSStorage()
&& !conf.hasS3Storage()
&& !conf.hasOssStorage())
|| highWorkload)) {
LOG.warn(
"Worker {} (unhealthy disks num: {}) adds to excluded workers", worker, unhealthyDiskNum);
excludedWorkers.add(worker);
} else if ((availableSlots.get() > 0
|| conf.hasHDFSStorage()
|| conf.hasS3Storage()
|| conf.hasOssStorage())
&& !highWorkload) {
// only unblack if numSlots larger than 0
excludedWorkers.remove(worker);
}
// try to update the available workers if the worker status is Normal
if (workerStatus.getState() == PbWorkerStatus.State.Normal) {
updateAvailableWorkers(worker);
}
}