def destroySlotsWithRetry()

in client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala [1569:1659]


  def destroySlotsWithRetry(
      shuffleId: Int,
      slotsToDestroy: WorkerResource): Unit = {
    val shuffleKey = Utils.makeShuffleKey(appUniqueId, shuffleId)

    def retryDestroy(status: DestroyFutureWithStatus, currentTime: Long): Unit = {
      status.retryTimes += 1
      status.startTime = currentTime
      // mock failure if mockDestroyFailure is true and this is not the last retry
      status.message.mockFailure =
        status.message.mockFailure && (status.retryTimes != rpcMaxRetires)
      status.future =
        status.endpoint.ask[DestroyWorkerSlotsResponse](status.message)
    }

    val startTime = System.currentTimeMillis()
    val futures = new util.LinkedList[DestroyFutureWithStatus]()
    slotsToDestroy.asScala.filter(_._1.endpoint != null).foreach {
      case (workerInfo, (primaryLocations, replicaLocations)) =>
        val primaryIds = primaryLocations.asScala.map(_.getUniqueId).asJava
        val replicaIds = replicaLocations.asScala.map(_.getUniqueId).asJava
        val destroy = DestroyWorkerSlots(shuffleKey, primaryIds, replicaIds, mockDestroyFailure)
        val future = workerInfo.endpoint.ask[DestroyWorkerSlotsResponse](destroy)
        futures.add(DestroyFutureWithStatus(future, destroy, workerInfo.endpoint, 1, startTime))
    }

    val timeout = conf.rpcAskTimeout.duration.toMillis
    var remainingTime = timeout * rpcMaxRetires
    val delta = 50
    while (remainingTime > 0 && !futures.isEmpty) {
      val currentTime = System.currentTimeMillis()
      val iter = futures.iterator()
      while (iter.hasNext) {
        val futureWithStatus = iter.next()
        val message = futureWithStatus.message
        val retryTimes = futureWithStatus.retryTimes
        if (futureWithStatus.future.isCompleted) {
          futureWithStatus.future.value.get match {
            case scala.util.Success(res) =>
              if (res.status != StatusCode.SUCCESS && retryTimes < rpcMaxRetires) {
                logError(
                  s"Request $message to ${futureWithStatus.endpoint} return ${res.status} for $shuffleKey $retryTimes/$rpcMaxRetires, " +
                    "will retry.")
                retryDestroy(futureWithStatus, currentTime)
              } else {
                if (res.status != StatusCode.SUCCESS && retryTimes == rpcMaxRetires) {
                  logError(
                    s"Request $message to ${futureWithStatus.endpoint} return ${res.status} for $shuffleKey $retryTimes/$rpcMaxRetires, " +
                      "will not retry.")
                }
                iter.remove()
              }
            case scala.util.Failure(e) =>
              if (retryTimes < rpcMaxRetires) {
                logError(
                  s"Request $message to ${futureWithStatus.endpoint} failed $retryTimes/$rpcMaxRetires for $shuffleKey, reason: $e, " +
                    "will retry.")
                retryDestroy(futureWithStatus, currentTime)
              } else {
                if (retryTimes == rpcMaxRetires) {
                  logError(
                    s"Request $message to ${futureWithStatus.endpoint} failed $retryTimes/$rpcMaxRetires for $shuffleKey, reason: $e, " +
                      "will not retry.")
                }
                iter.remove()
              }
          }
        } else if (currentTime - futureWithStatus.startTime > timeout) {
          if (retryTimes < rpcMaxRetires) {
            logError(
              s"Request $message to ${futureWithStatus.endpoint} failed $retryTimes/$rpcMaxRetires for $shuffleKey, reason: Timeout, " +
                "will retry.")
            retryDestroy(futureWithStatus, currentTime)
          } else {
            if (retryTimes == rpcMaxRetires) {
              logError(
                s"Request $message to ${futureWithStatus.endpoint} failed $retryTimes/$rpcMaxRetires for $shuffleKey, reason: Timeout, " +
                  "will retry.")
            }
            iter.remove()
          }
        }
      }

      if (!futures.isEmpty) {
        Thread.sleep(delta)
        remainingTime -= delta
      }
    }
    futures.clear()
  }