def handleHeartbeatResponse()

in client/src/main/scala/org/apache/celeborn/client/WorkerStatusTracker.scala [153:222]


  def handleHeartbeatResponse(res: HeartbeatFromApplicationResponse): Unit = {
    if (res.statusCode == StatusCode.SUCCESS) {
      logDebug(s"Received Worker status from Primary, excluded workers: ${res.excludedWorkers} " +
        s"unknown workers: ${res.unknownWorkers}, shutdown workers: ${res.shuttingWorkers}")
      val current = System.currentTimeMillis()
      var statusChanged = false

      excludedWorkers.asScala.foreach {
        case (workerInfo: WorkerInfo, (statusCode, registerTime)) =>
          statusCode match {
            case StatusCode.WORKER_UNKNOWN |
                StatusCode.NO_AVAILABLE_WORKING_DIR |
                StatusCode.RESERVE_SLOTS_FAILED |
                StatusCode.PUSH_DATA_CREATE_CONNECTION_FAIL_PRIMARY |
                StatusCode.PUSH_DATA_CREATE_CONNECTION_FAIL_REPLICA |
                StatusCode.PUSH_DATA_CONNECTION_EXCEPTION_REPLICA |
                StatusCode.PUSH_DATA_CONNECTION_EXCEPTION_REPLICA |
                StatusCode.PUSH_DATA_TIMEOUT_PRIMARY |
                StatusCode.PUSH_DATA_TIMEOUT_REPLICA
                if current - registerTime < excludedWorkerExpireTimeout => // reserve
            case _ =>
              if (!res.excludedWorkers.contains(workerInfo) &&
                !res.shuttingWorkers.contains(workerInfo) &&
                !res.unknownWorkers.contains(workerInfo)) {
                excludedWorkers.remove(workerInfo)
                statusChanged = true
              }
          }
      }
      for (worker <- res.excludedWorkers.asScala) {
        if (!excludedWorkers.containsKey(worker)) {
          excludedWorkers.put(worker, (StatusCode.WORKER_EXCLUDED, current))
          statusChanged = true
        }
      }
      for (worker <- res.unknownWorkers.asScala) {
        if (!excludedWorkers.containsKey(worker)) {
          excludedWorkers.put(worker, (StatusCode.WORKER_UNKNOWN, current))
          statusChanged = true
        }
      }
      val retainResult = shuttingWorkers.retainAll(res.shuttingWorkers)
      val addResult = shuttingWorkers.addAll(res.shuttingWorkers)
      statusChanged = statusChanged || retainResult || addResult
      // Always trigger commit files for shutting down workers from HeartbeatFromApplicationResponse
      // See details in CELEBORN-696
      if (!res.unknownWorkers.isEmpty || !res.shuttingWorkers.isEmpty) {
        val workerStatus = new WorkersStatus(res.unknownWorkers, res.shuttingWorkers)
        workerStatusListeners.asScala.foreach { listener =>
          try {
            listener.notifyChangedWorkersStatus(workerStatus)
          } catch {
            case t: Throwable =>
              logError("Error while notify listener", t)
          }
        }
      }
      if (statusChanged) {
        logWarning("Worker status changed from application heartbeat response")
        logInfo(
          s"""
             |Current excluded workers:
             |${excludedWorkers.asScala.mkString("\n")}
             |
             |Current shutting down workers:
             |${shuttingWorkers.asScala.map(_.readableAddress()).mkString("\n")}
             |""".stripMargin)
      }
    }
  }