public void updateWorkerHeartbeatMeta()

in master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/AbstractMetaManager.java [246:306]


  public void updateWorkerHeartbeatMeta(
      String host,
      int rpcPort,
      int pushPort,
      int fetchPort,
      int replicatePort,
      Map<String, DiskInfo> disks,
      long time,
      WorkerStatus workerStatus,
      boolean highWorkload) {
    WorkerInfo worker =
        new WorkerInfo(host, rpcPort, pushPort, fetchPort, replicatePort, -1, disks, null);
    AtomicLong availableSlots = new AtomicLong();
    LOG.debug("update worker {}:{} heartbeat {}", host, rpcPort, disks);
    synchronized (workersMap) {
      Optional<WorkerInfo> workerInfo = Optional.ofNullable(workersMap.get(worker.toUniqueId()));
      workerInfo.ifPresent(
          info -> {
            info.updateThenGetDiskInfos(disks, Option.apply(estimatedPartitionSize));
            availableSlots.set(info.totalAvailableSlots());
            info.lastHeartbeat_$eq(time);
            info.setWorkerStatus(workerStatus);
          });
    }

    WorkerEventInfo workerEventInfo = workerEventInfos.get(worker);
    if (workerEventInfo != null
        && WorkerStatusUtils.meetFinalState(workerEventInfo, workerStatus)) {
      workerEventInfos.remove(worker);
      if (workerStatus.getState() == PbWorkerStatus.State.Normal) {
        shutdownWorkers.remove(worker);
      }
    }

    // If using HDFSONLY mode, workers with empty disks should not be put into excluded worker list.
    long unhealthyDiskNum =
        disks.values().stream().filter(s -> !s.status().equals(DiskStatus.HEALTHY)).count();
    boolean exceed = unhealthyDiskNum * 1.0 / disks.size() >= unhealthyDiskRatioThreshold;
    if (!excludedWorkers.contains(worker)
        && (((disks.isEmpty() || exceed)
                && !conf.hasHDFSStorage()
                && !conf.hasS3Storage()
                && !conf.hasOssStorage())
            || highWorkload)) {
      LOG.warn(
          "Worker {} (unhealthy disks num: {}) adds to excluded workers", worker, unhealthyDiskNum);
      excludedWorkers.add(worker);
    } else if ((availableSlots.get() > 0
            || conf.hasHDFSStorage()
            || conf.hasS3Storage()
            || conf.hasOssStorage())
        && !highWorkload) {
      // only unblack if numSlots larger than 0
      excludedWorkers.remove(worker);
    }

    // try to update the available workers if the worker status is Normal
    if (workerStatus.getState() == PbWorkerStatus.State.Normal) {
      updateAvailableWorkers(worker);
    }
  }