def handleHeartbeatResponse()

in client/src/main/scala/org/apache/celeborn/client/WorkerStatusTracker.scala [150:215]


  def handleHeartbeatResponse(res: HeartbeatFromApplicationResponse): Unit = {
    if (res.statusCode == StatusCode.SUCCESS) {
      logDebug(s"Received Worker status from Primary, excluded workers: ${res.excludedWorkers} " +
        s"unknown workers: ${res.unknownWorkers}, shutdown workers: ${res.shuttingWorkers}")
      val current = System.currentTimeMillis()
      var statusChanged = false

      excludedWorkers.asScala.foreach {
        case (workerInfo: WorkerInfo, (statusCode, registerTime)) =>
          statusCode match {
            case StatusCode.WORKER_UNKNOWN |
                StatusCode.NO_AVAILABLE_WORKING_DIR |
                StatusCode.RESERVE_SLOTS_FAILED |
                StatusCode.PUSH_DATA_CREATE_CONNECTION_FAIL_PRIMARY |
                StatusCode.PUSH_DATA_CREATE_CONNECTION_FAIL_REPLICA |
                StatusCode.PUSH_DATA_CONNECTION_EXCEPTION_PRIMARY |
                StatusCode.PUSH_DATA_CONNECTION_EXCEPTION_REPLICA |
                StatusCode.PUSH_DATA_TIMEOUT_PRIMARY |
                StatusCode.PUSH_DATA_TIMEOUT_REPLICA
                if current - registerTime < excludedWorkerExpireTimeout => // reserve
            case _ =>
              if (!res.excludedWorkers.contains(workerInfo) &&
                !res.shuttingWorkers.contains(workerInfo) &&
                !res.unknownWorkers.contains(workerInfo)) {
                excludedWorkers.remove(workerInfo)
                statusChanged = true
              }
          }
      }
      for (worker <- res.excludedWorkers.asScala) {
        if (!excludedWorkers.containsKey(worker)) {
          excludedWorkers.put(worker, (StatusCode.WORKER_EXCLUDED, current))
          statusChanged = true
        }
      }
      for (worker <- res.unknownWorkers.asScala) {
        if (!excludedWorkers.containsKey(worker)) {
          excludedWorkers.put(worker, (StatusCode.WORKER_UNKNOWN, current))
          statusChanged = true
        }
      }

      val retainShuttingWorkersResult = shuttingWorkers.retainAll(res.shuttingWorkers)
      val addShuttingWorkersResult = shuttingWorkers.addAll(res.shuttingWorkers)

      statusChanged =
        statusChanged || retainShuttingWorkersResult || addShuttingWorkersResult
      // Always trigger commit files for shutting down workers from HeartbeatFromApplicationResponse
      // See details in CELEBORN-696
      if (!res.unknownWorkers.isEmpty || !res.shuttingWorkers.isEmpty) {
        val workerStatus = new WorkersStatus(res.unknownWorkers, res.shuttingWorkers)
        workerStatusListeners.asScala.foreach { listener =>
          try {
            listener.notifyChangedWorkersStatus(workerStatus)
          } catch {
            case t: Throwable =>
              logError("Error while notify listener", t)
          }
        }
      }
      if (statusChanged) {
        logWarning(
          s"Worker status changed from application heartbeat response.$currentFailedWorkers")
      }
    }
  }