in client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala [1176:1277]
private def reserveSlots(
shuffleId: Int,
slots: WorkerResource,
isSegmentGranularityVisible: Boolean = false): util.List[WorkerInfo] = {
val reserveSlotFailedWorkers = new ShuffleFailedWorkers()
val failureInfos = new util.concurrent.CopyOnWriteArrayList[String]()
val workerPartitionLocations = slots.asScala.filter(p => !p._2._1.isEmpty || !p._2._2.isEmpty)
val (locsWithNullEndpoint, locs) = workerPartitionLocations.partition(_._1.endpoint == null)
val futures = new LinkedBlockingQueue[(Future[ReserveSlotsResponse], WorkerInfo)]()
val outFutures = locs.map { case (workerInfo, (primaryLocations, replicaLocations)) =>
Future {
val future = workerInfo.endpoint.ask[ReserveSlotsResponse](
ReserveSlots(
appUniqueId,
shuffleId,
primaryLocations,
replicaLocations,
partitionSplitThreshold,
partitionSplitMode,
getPartitionType(shuffleId),
rangeReadFilter,
userIdentifier,
conf.pushDataTimeoutMs,
partitionSplitEnabled = true,
isSegmentGranularityVisible = isSegmentGranularityVisible))
futures.add((future, workerInfo))
}(ec)
}
val cbf =
implicitly[
CanBuildFrom[mutable.Iterable[Future[Boolean]], Boolean, mutable.Iterable[Boolean]]]
val futureSeq = Future.sequence(outFutures)(cbf, ec)
awaitResult(futureSeq, Duration.Inf)
var timeout = conf.rpcAskTimeout.duration.toMillis
val delta = 50
while (timeout >= 0 && !futures.isEmpty) {
val iter = futures.iterator()
while (iter.hasNext) {
val (future, workerInfo) = iter.next()
if (future.isCompleted) {
future.value.get match {
case scala.util.Success(res) =>
if (res.status.equals(StatusCode.SUCCESS)) {
logDebug(s"Successfully allocated " +
s"partitions buffer for shuffleId $shuffleId" +
s" from worker ${workerInfo.readableAddress()}.")
} else {
failureInfos.add(s"[reserveSlots] Failed to" +
s" reserve buffers for shuffleId $shuffleId" +
s" from worker ${workerInfo.readableAddress()}. Reason: ${res.reason}")
reserveSlotFailedWorkers.put(workerInfo, (res.status, System.currentTimeMillis()))
}
case scala.util.Failure(e) =>
failureInfos.add(s"[reserveSlots] Failed to" +
s" reserve buffers for shuffleId $shuffleId" +
s" from worker ${workerInfo.readableAddress()}. Reason: $e")
reserveSlotFailedWorkers.put(
workerInfo,
(StatusCode.REQUEST_FAILED, System.currentTimeMillis()))
}
iter.remove()
}
}
if (!futures.isEmpty) {
Thread.sleep(delta)
}
timeout = timeout - delta
}
val iter = futures.iterator()
while (iter.hasNext) {
val futureStatus = iter.next()
val workerInfo = futureStatus._2
failureInfos.add(s"[reserveSlots] Failed to" +
s" reserve buffers for shuffleId $shuffleId" +
s" from worker ${workerInfo.readableAddress()}. Reason: Timeout")
reserveSlotFailedWorkers.put(
workerInfo,
(StatusCode.REQUEST_FAILED, System.currentTimeMillis()))
iter.remove()
}
locsWithNullEndpoint.foreach { case (workerInfo, (_, _)) =>
failureInfos.add(s"[reserveSlots] Failed to" +
s" reserve buffers for shuffleId $shuffleId" +
s" from worker ${workerInfo.readableAddress()}. Reason: null endpoint")
reserveSlotFailedWorkers.put(
workerInfo,
(StatusCode.REQUEST_FAILED, System.currentTimeMillis()))
}
if (failureInfos.asScala.nonEmpty) {
logError(s"Aggregated error of reserveSlots for " +
s"shuffleId $shuffleId " +
s"failure:${failureInfos.asScala.foldLeft("")((x, y) => s"$x \n $y")}")
}
workerStatusTracker.recordWorkerFailure(reserveSlotFailedWorkers)
new util.ArrayList[WorkerInfo](reserveSlotFailedWorkers.asScala.keys.toList.asJava)
}