private def reserveSlots()

in client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala [1176:1277]


  private def reserveSlots(
      shuffleId: Int,
      slots: WorkerResource,
      isSegmentGranularityVisible: Boolean = false): util.List[WorkerInfo] = {
    val reserveSlotFailedWorkers = new ShuffleFailedWorkers()
    val failureInfos = new util.concurrent.CopyOnWriteArrayList[String]()
    val workerPartitionLocations = slots.asScala.filter(p => !p._2._1.isEmpty || !p._2._2.isEmpty)

    val (locsWithNullEndpoint, locs) = workerPartitionLocations.partition(_._1.endpoint == null)
    val futures = new LinkedBlockingQueue[(Future[ReserveSlotsResponse], WorkerInfo)]()
    val outFutures = locs.map { case (workerInfo, (primaryLocations, replicaLocations)) =>
      Future {
        val future = workerInfo.endpoint.ask[ReserveSlotsResponse](
          ReserveSlots(
            appUniqueId,
            shuffleId,
            primaryLocations,
            replicaLocations,
            partitionSplitThreshold,
            partitionSplitMode,
            getPartitionType(shuffleId),
            rangeReadFilter,
            userIdentifier,
            conf.pushDataTimeoutMs,
            partitionSplitEnabled = true,
            isSegmentGranularityVisible = isSegmentGranularityVisible))
        futures.add((future, workerInfo))
      }(ec)
    }
    val cbf =
      implicitly[
        CanBuildFrom[mutable.Iterable[Future[Boolean]], Boolean, mutable.Iterable[Boolean]]]
    val futureSeq = Future.sequence(outFutures)(cbf, ec)
    awaitResult(futureSeq, Duration.Inf)

    var timeout = conf.rpcAskTimeout.duration.toMillis
    val delta = 50
    while (timeout >= 0 && !futures.isEmpty) {
      val iter = futures.iterator()
      while (iter.hasNext) {
        val (future, workerInfo) = iter.next()
        if (future.isCompleted) {
          future.value.get match {
            case scala.util.Success(res) =>
              if (res.status.equals(StatusCode.SUCCESS)) {
                logDebug(s"Successfully allocated " +
                  s"partitions buffer for shuffleId $shuffleId" +
                  s" from worker ${workerInfo.readableAddress()}.")
              } else {
                failureInfos.add(s"[reserveSlots] Failed to" +
                  s" reserve buffers for shuffleId $shuffleId" +
                  s" from worker ${workerInfo.readableAddress()}. Reason: ${res.reason}")
                reserveSlotFailedWorkers.put(workerInfo, (res.status, System.currentTimeMillis()))
              }
            case scala.util.Failure(e) =>
              failureInfos.add(s"[reserveSlots] Failed to" +
                s" reserve buffers for shuffleId $shuffleId" +
                s" from worker ${workerInfo.readableAddress()}. Reason: $e")
              reserveSlotFailedWorkers.put(
                workerInfo,
                (StatusCode.REQUEST_FAILED, System.currentTimeMillis()))
          }
          iter.remove()
        }
      }

      if (!futures.isEmpty) {
        Thread.sleep(delta)
      }
      timeout = timeout - delta
    }

    val iter = futures.iterator()
    while (iter.hasNext) {
      val futureStatus = iter.next()
      val workerInfo = futureStatus._2
      failureInfos.add(s"[reserveSlots] Failed to" +
        s" reserve buffers for shuffleId $shuffleId" +
        s" from worker ${workerInfo.readableAddress()}. Reason: Timeout")
      reserveSlotFailedWorkers.put(
        workerInfo,
        (StatusCode.REQUEST_FAILED, System.currentTimeMillis()))
      iter.remove()
    }

    locsWithNullEndpoint.foreach { case (workerInfo, (_, _)) =>
      failureInfos.add(s"[reserveSlots] Failed to" +
        s" reserve buffers for shuffleId $shuffleId" +
        s" from worker ${workerInfo.readableAddress()}. Reason: null endpoint")
      reserveSlotFailedWorkers.put(
        workerInfo,
        (StatusCode.REQUEST_FAILED, System.currentTimeMillis()))
    }

    if (failureInfos.asScala.nonEmpty) {
      logError(s"Aggregated error of reserveSlots for " +
        s"shuffleId $shuffleId " +
        s"failure:${failureInfos.asScala.foldLeft("")((x, y) => s"$x \n $y")}")
    }
    workerStatusTracker.recordWorkerFailure(reserveSlotFailedWorkers)
    new util.ArrayList[WorkerInfo](reserveSlotFailedWorkers.asScala.keys.toList.asJava)
  }