def excludeWorkerFromPartition()

in client/src/main/scala/org/apache/celeborn/client/WorkerStatusTracker.scala [72:119]


  def excludeWorkerFromPartition(
      shuffleId: Int,
      oldPartition: PartitionLocation,
      cause: StatusCode): Unit = {
    val failedWorker = new ShuffleFailedWorkers()

    def excludeWorker(partition: PartitionLocation, statusCode: StatusCode): Unit = {
      val tmpWorker = partition.getWorker
      val worker = lifecycleManager.workerSnapshots(shuffleId).asScala.get(
        tmpWorker.toUniqueId).map(_.workerInfo)
      if (worker.isDefined) {
        failedWorker.put(worker.get, (statusCode, System.currentTimeMillis()))
      }
    }

    if (oldPartition != null) {
      cause match {
        case StatusCode.PUSH_DATA_WRITE_FAIL_PRIMARY =>
          excludeWorker(oldPartition, StatusCode.PUSH_DATA_WRITE_FAIL_PRIMARY)
        case StatusCode.PUSH_DATA_WRITE_FAIL_REPLICA
            if oldPartition.hasPeer && conf.clientExcludeReplicaOnFailureEnabled =>
          excludeWorker(oldPartition.getPeer, StatusCode.PUSH_DATA_WRITE_FAIL_REPLICA)
        case StatusCode.PUSH_DATA_CREATE_CONNECTION_FAIL_PRIMARY =>
          excludeWorker(oldPartition, StatusCode.PUSH_DATA_CREATE_CONNECTION_FAIL_PRIMARY)
        case StatusCode.PUSH_DATA_CREATE_CONNECTION_FAIL_REPLICA
            if oldPartition.hasPeer && conf.clientExcludeReplicaOnFailureEnabled =>
          excludeWorker(
            oldPartition.getPeer,
            StatusCode.PUSH_DATA_CREATE_CONNECTION_FAIL_REPLICA)
        case StatusCode.PUSH_DATA_CONNECTION_EXCEPTION_PRIMARY =>
          excludeWorker(oldPartition, StatusCode.PUSH_DATA_CONNECTION_EXCEPTION_PRIMARY)
        case StatusCode.PUSH_DATA_CONNECTION_EXCEPTION_REPLICA
            if oldPartition.hasPeer && conf.clientExcludeReplicaOnFailureEnabled =>
          excludeWorker(
            oldPartition.getPeer,
            StatusCode.PUSH_DATA_CONNECTION_EXCEPTION_REPLICA)
        case StatusCode.PUSH_DATA_TIMEOUT_PRIMARY =>
          excludeWorker(oldPartition, StatusCode.PUSH_DATA_TIMEOUT_PRIMARY)
        case StatusCode.PUSH_DATA_TIMEOUT_REPLICA
            if oldPartition.hasPeer && conf.clientExcludeReplicaOnFailureEnabled =>
          excludeWorker(
            oldPartition.getPeer,
            StatusCode.PUSH_DATA_TIMEOUT_REPLICA)
        case _ =>
      }
    }
    recordWorkerFailure(failedWorker)
  }