in worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Controller.scala [590:665]
private def handleDestroy(
context: RpcCallContext,
shuffleKey: String,
primaryLocations: jList[String],
replicaLocations: jList[String]): Unit = {
// check whether shuffleKey has registered
if (!partitionLocationInfo.containsShuffle(shuffleKey)) {
logWarning(s"Shuffle $shuffleKey not registered!")
context.reply(
DestroyWorkerSlotsResponse(
StatusCode.SHUFFLE_NOT_REGISTERED,
primaryLocations,
replicaLocations))
return
}
val failedPrimaries = new jArrayList[String]()
val failedReplicas = new jArrayList[String]()
// destroy primary locations
if (primaryLocations != null && !primaryLocations.isEmpty) {
primaryLocations.asScala.foreach { uniqueId =>
try {
storageManager.cleanFile(
shuffleKey,
PartitionLocation.getFileName(uniqueId, PartitionLocation.Mode.PRIMARY))
} catch {
case e: Exception =>
failedPrimaries.add(uniqueId)
logDebug(s"Destroy primary file $uniqueId for $shuffleKey failed.", e)
}
}
// remove primary locations from WorkerInfo
val releasePrimaryLocations =
partitionLocationInfo.removePrimaryPartitions(shuffleKey, primaryLocations)
workerInfo.releaseSlots(shuffleKey, releasePrimaryLocations._1)
}
// destroy replica locations
if (replicaLocations != null && !replicaLocations.isEmpty) {
replicaLocations.asScala.foreach { uniqueId =>
try {
storageManager.cleanFile(
shuffleKey,
PartitionLocation.getFileName(uniqueId, PartitionLocation.Mode.REPLICA))
} catch {
case e: Exception =>
failedReplicas.add(uniqueId)
logDebug(s"Destroy replica file $uniqueId for $shuffleKey failed.", e)
}
}
// remove replica locations from worker info
val releaseReplicaLocations =
partitionLocationInfo.removeReplicaPartitions(shuffleKey, replicaLocations)
workerInfo.releaseSlots(shuffleKey, releaseReplicaLocations._1)
}
// reply
if (failedPrimaries.isEmpty && failedReplicas.isEmpty) {
logInfo(
s"Destroy ${primaryLocations.size()} primary location and ${replicaLocations.size()}" +
s" replica locations for $shuffleKey successfully.")
context.reply(
DestroyWorkerSlotsResponse(
StatusCode.SUCCESS,
List.empty.asJava,
List.empty.asJava))
} else {
logInfo(s"Destroy ${failedPrimaries.size()}/${primaryLocations.size()} primary location and" +
s"${failedReplicas.size()}/${replicaLocations.size()} replica location for" +
s" $shuffleKey PartialSuccess.")
context.reply(
DestroyWorkerSlotsResponse(
StatusCode.PARTIAL_SUCCESS,
failedPrimaries,
failedReplicas))
}
}