in client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala [1569:1659]
def destroySlotsWithRetry(
shuffleId: Int,
slotsToDestroy: WorkerResource): Unit = {
val shuffleKey = Utils.makeShuffleKey(appUniqueId, shuffleId)
def retryDestroy(status: DestroyFutureWithStatus, currentTime: Long): Unit = {
status.retryTimes += 1
status.startTime = currentTime
// mock failure if mockDestroyFailure is true and this is not the last retry
status.message.mockFailure =
status.message.mockFailure && (status.retryTimes != rpcMaxRetires)
status.future =
status.endpoint.ask[DestroyWorkerSlotsResponse](status.message)
}
val startTime = System.currentTimeMillis()
val futures = new util.LinkedList[DestroyFutureWithStatus]()
slotsToDestroy.asScala.filter(_._1.endpoint != null).foreach {
case (workerInfo, (primaryLocations, replicaLocations)) =>
val primaryIds = primaryLocations.asScala.map(_.getUniqueId).asJava
val replicaIds = replicaLocations.asScala.map(_.getUniqueId).asJava
val destroy = DestroyWorkerSlots(shuffleKey, primaryIds, replicaIds, mockDestroyFailure)
val future = workerInfo.endpoint.ask[DestroyWorkerSlotsResponse](destroy)
futures.add(DestroyFutureWithStatus(future, destroy, workerInfo.endpoint, 1, startTime))
}
val timeout = conf.rpcAskTimeout.duration.toMillis
var remainingTime = timeout * rpcMaxRetires
val delta = 50
while (remainingTime > 0 && !futures.isEmpty) {
val currentTime = System.currentTimeMillis()
val iter = futures.iterator()
while (iter.hasNext) {
val futureWithStatus = iter.next()
val message = futureWithStatus.message
val retryTimes = futureWithStatus.retryTimes
if (futureWithStatus.future.isCompleted) {
futureWithStatus.future.value.get match {
case scala.util.Success(res) =>
if (res.status != StatusCode.SUCCESS && retryTimes < rpcMaxRetires) {
logError(
s"Request $message to ${futureWithStatus.endpoint} return ${res.status} for $shuffleKey $retryTimes/$rpcMaxRetires, " +
"will retry.")
retryDestroy(futureWithStatus, currentTime)
} else {
if (res.status != StatusCode.SUCCESS && retryTimes == rpcMaxRetires) {
logError(
s"Request $message to ${futureWithStatus.endpoint} return ${res.status} for $shuffleKey $retryTimes/$rpcMaxRetires, " +
"will not retry.")
}
iter.remove()
}
case scala.util.Failure(e) =>
if (retryTimes < rpcMaxRetires) {
logError(
s"Request $message to ${futureWithStatus.endpoint} failed $retryTimes/$rpcMaxRetires for $shuffleKey, reason: $e, " +
"will retry.")
retryDestroy(futureWithStatus, currentTime)
} else {
if (retryTimes == rpcMaxRetires) {
logError(
s"Request $message to ${futureWithStatus.endpoint} failed $retryTimes/$rpcMaxRetires for $shuffleKey, reason: $e, " +
"will not retry.")
}
iter.remove()
}
}
} else if (currentTime - futureWithStatus.startTime > timeout) {
if (retryTimes < rpcMaxRetires) {
logError(
s"Request $message to ${futureWithStatus.endpoint} failed $retryTimes/$rpcMaxRetires for $shuffleKey, reason: Timeout, " +
"will retry.")
retryDestroy(futureWithStatus, currentTime)
} else {
if (retryTimes == rpcMaxRetires) {
logError(
s"Request $message to ${futureWithStatus.endpoint} failed $retryTimes/$rpcMaxRetires for $shuffleKey, reason: Timeout, " +
"will retry.")
}
iter.remove()
}
}
}
if (!futures.isEmpty) {
Thread.sleep(delta)
remainingTime -= delta
}
}
futures.clear()
}