in client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala [1394:1476]
def reserveSlotsWithRetry(
shuffleId: Int,
candidates: util.HashSet[WorkerInfo],
slots: WorkerResource,
updateEpoch: Boolean = true,
isSegmentGranularityVisible: Boolean = false): Boolean = {
var requestSlots = slots
val reserveSlotsMaxRetries = conf.clientReserveSlotsMaxRetries
val reserveSlotsRetryWait = conf.clientReserveSlotsRetryWait
var retryTimes = 1
var noAvailableSlots = false
var success = false
while (retryTimes <= reserveSlotsMaxRetries && !success && !noAvailableSlots) {
if (retryTimes > 1) {
Thread.sleep(reserveSlotsRetryWait)
}
// reserve buffers
logInfo(s"Try reserve slots for $shuffleId for $retryTimes times.")
val reserveFailedWorkers = reserveSlots(shuffleId, requestSlots, isSegmentGranularityVisible)
if (reserveFailedWorkers.isEmpty) {
success = true
} else {
// Should remove failed workers from candidates during retry to avoid reallocate in failed workers.
candidates.removeAll(reserveFailedWorkers)
// Find out all failed partition locations and remove failed worker's partition location
// from slots.
val failedPartitionLocations =
getFailedPartitionLocations(reserveFailedWorkers, slots)
// When enable replicate, if one of the partition location reserve slots failed, we also
// need to release another corresponding partition location and remove it from slots.
if (failedPartitionLocations.nonEmpty && !slots.isEmpty) {
releasePartitionLocation(shuffleId, slots, failedPartitionLocations)
}
if (pushReplicateEnabled && failedPartitionLocations.nonEmpty && !slots.isEmpty) {
releasePartitionLocation(shuffleId, slots, failedPartitionLocations, true)
}
if (retryTimes < reserveSlotsMaxRetries) {
// get retryCandidates resource and retry reserve buffer
val retryCandidates = new util.HashSet(slots.keySet())
// add candidates to avoid revive action passed in slots only 2 worker
retryCandidates.addAll(candidates)
// remove excluded workers from retryCandidates
retryCandidates.removeAll(
workerStatusTracker.excludedWorkers.keys().asScala.toList.asJava)
retryCandidates.removeAll(workerStatusTracker.shuttingWorkers.asScala.toList.asJava)
if (retryCandidates.size < 1 || (pushReplicateEnabled && retryCandidates.size < 2)) {
logError(s"Retry reserve slots for $shuffleId failed caused by not enough slots.")
noAvailableSlots = true
} else {
// Only when the LifecycleManager needs to retry reserve slots again, re-allocate slots
// and put the new allocated slots to the total slots, the re-allocated slots won't be
// duplicated with existing partition locations.
requestSlots = reallocateSlotsFromCandidates(
failedPartitionLocations.values.toList,
retryCandidates.asScala.toList,
updateEpoch)
requestSlots.asScala.foreach {
case (workerInfo, (retryPrimaryLocs, retryReplicaLocs)) =>
val (primaryPartitionLocations, replicaPartitionLocations) =
slots.computeIfAbsent(workerInfo, newLocationFunc)
primaryPartitionLocations.addAll(retryPrimaryLocs)
replicaPartitionLocations.addAll(retryReplicaLocs)
}
}
} else {
logError(s"Try reserve slots for $shuffleId failed after $reserveSlotsMaxRetries retry.")
}
}
retryTimes += 1
}
// if failed after retry, destroy all allocated buffers
if (!success) {
// Reserve slot failed workers' partition location and corresponding peer partition location
// has been removed from slots by call [[getFailedPartitionLocations]] and
// [[releasePartitionLocation]]. Now in the slots are all the successful partition
// locations.
logWarning(s"Reserve buffers for $shuffleId still fail after retrying, clear buffers.")
destroySlotsWithRetry(shuffleId, slots)
} else {
logInfo(s"Reserve buffer success for shuffleId $shuffleId")
}
success
}