in client/src/main/scala/org/apache/celeborn/client/WorkerStatusTracker.scala [72:119]
def excludeWorkerFromPartition(
shuffleId: Int,
oldPartition: PartitionLocation,
cause: StatusCode): Unit = {
val failedWorker = new ShuffleFailedWorkers()
def excludeWorker(partition: PartitionLocation, statusCode: StatusCode): Unit = {
val tmpWorker = partition.getWorker
val worker = lifecycleManager.workerSnapshots(shuffleId).asScala.get(
tmpWorker.toUniqueId).map(_.workerInfo)
if (worker.isDefined) {
failedWorker.put(worker.get, (statusCode, System.currentTimeMillis()))
}
}
if (oldPartition != null) {
cause match {
case StatusCode.PUSH_DATA_WRITE_FAIL_PRIMARY =>
excludeWorker(oldPartition, StatusCode.PUSH_DATA_WRITE_FAIL_PRIMARY)
case StatusCode.PUSH_DATA_WRITE_FAIL_REPLICA
if oldPartition.hasPeer && conf.clientExcludeReplicaOnFailureEnabled =>
excludeWorker(oldPartition.getPeer, StatusCode.PUSH_DATA_WRITE_FAIL_REPLICA)
case StatusCode.PUSH_DATA_CREATE_CONNECTION_FAIL_PRIMARY =>
excludeWorker(oldPartition, StatusCode.PUSH_DATA_CREATE_CONNECTION_FAIL_PRIMARY)
case StatusCode.PUSH_DATA_CREATE_CONNECTION_FAIL_REPLICA
if oldPartition.hasPeer && conf.clientExcludeReplicaOnFailureEnabled =>
excludeWorker(
oldPartition.getPeer,
StatusCode.PUSH_DATA_CREATE_CONNECTION_FAIL_REPLICA)
case StatusCode.PUSH_DATA_CONNECTION_EXCEPTION_PRIMARY =>
excludeWorker(oldPartition, StatusCode.PUSH_DATA_CONNECTION_EXCEPTION_PRIMARY)
case StatusCode.PUSH_DATA_CONNECTION_EXCEPTION_REPLICA
if oldPartition.hasPeer && conf.clientExcludeReplicaOnFailureEnabled =>
excludeWorker(
oldPartition.getPeer,
StatusCode.PUSH_DATA_CONNECTION_EXCEPTION_REPLICA)
case StatusCode.PUSH_DATA_TIMEOUT_PRIMARY =>
excludeWorker(oldPartition, StatusCode.PUSH_DATA_TIMEOUT_PRIMARY)
case StatusCode.PUSH_DATA_TIMEOUT_REPLICA
if oldPartition.hasPeer && conf.clientExcludeReplicaOnFailureEnabled =>
excludeWorker(
oldPartition.getPeer,
StatusCode.PUSH_DATA_TIMEOUT_REPLICA)
case _ =>
}
}
recordWorkerFailure(failedWorker)
}