in client/src/main/scala/org/apache/celeborn/client/WorkerStatusTracker.scala [150:215]
def handleHeartbeatResponse(res: HeartbeatFromApplicationResponse): Unit = {
if (res.statusCode == StatusCode.SUCCESS) {
logDebug(s"Received Worker status from Primary, excluded workers: ${res.excludedWorkers} " +
s"unknown workers: ${res.unknownWorkers}, shutdown workers: ${res.shuttingWorkers}")
val current = System.currentTimeMillis()
var statusChanged = false
excludedWorkers.asScala.foreach {
case (workerInfo: WorkerInfo, (statusCode, registerTime)) =>
statusCode match {
case StatusCode.WORKER_UNKNOWN |
StatusCode.NO_AVAILABLE_WORKING_DIR |
StatusCode.RESERVE_SLOTS_FAILED |
StatusCode.PUSH_DATA_CREATE_CONNECTION_FAIL_PRIMARY |
StatusCode.PUSH_DATA_CREATE_CONNECTION_FAIL_REPLICA |
StatusCode.PUSH_DATA_CONNECTION_EXCEPTION_PRIMARY |
StatusCode.PUSH_DATA_CONNECTION_EXCEPTION_REPLICA |
StatusCode.PUSH_DATA_TIMEOUT_PRIMARY |
StatusCode.PUSH_DATA_TIMEOUT_REPLICA
if current - registerTime < excludedWorkerExpireTimeout => // reserve
case _ =>
if (!res.excludedWorkers.contains(workerInfo) &&
!res.shuttingWorkers.contains(workerInfo) &&
!res.unknownWorkers.contains(workerInfo)) {
excludedWorkers.remove(workerInfo)
statusChanged = true
}
}
}
for (worker <- res.excludedWorkers.asScala) {
if (!excludedWorkers.containsKey(worker)) {
excludedWorkers.put(worker, (StatusCode.WORKER_EXCLUDED, current))
statusChanged = true
}
}
for (worker <- res.unknownWorkers.asScala) {
if (!excludedWorkers.containsKey(worker)) {
excludedWorkers.put(worker, (StatusCode.WORKER_UNKNOWN, current))
statusChanged = true
}
}
val retainShuttingWorkersResult = shuttingWorkers.retainAll(res.shuttingWorkers)
val addShuttingWorkersResult = shuttingWorkers.addAll(res.shuttingWorkers)
statusChanged =
statusChanged || retainShuttingWorkersResult || addShuttingWorkersResult
// Always trigger commit files for shutting down workers from HeartbeatFromApplicationResponse
// See details in CELEBORN-696
if (!res.unknownWorkers.isEmpty || !res.shuttingWorkers.isEmpty) {
val workerStatus = new WorkersStatus(res.unknownWorkers, res.shuttingWorkers)
workerStatusListeners.asScala.foreach { listener =>
try {
listener.notifyChangedWorkersStatus(workerStatus)
} catch {
case t: Throwable =>
logError("Error while notify listener", t)
}
}
}
if (statusChanged) {
logWarning(
s"Worker status changed from application heartbeat response.$currentFailedWorkers")
}
}
}