private def handleDestroy()

in worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Controller.scala [590:665]


  private def handleDestroy(
      context: RpcCallContext,
      shuffleKey: String,
      primaryLocations: jList[String],
      replicaLocations: jList[String]): Unit = {
    // check whether shuffleKey has registered
    if (!partitionLocationInfo.containsShuffle(shuffleKey)) {
      logWarning(s"Shuffle $shuffleKey not registered!")
      context.reply(
        DestroyWorkerSlotsResponse(
          StatusCode.SHUFFLE_NOT_REGISTERED,
          primaryLocations,
          replicaLocations))
      return
    }

    val failedPrimaries = new jArrayList[String]()
    val failedReplicas = new jArrayList[String]()

    // destroy primary locations
    if (primaryLocations != null && !primaryLocations.isEmpty) {
      primaryLocations.asScala.foreach { uniqueId =>
        try {
          storageManager.cleanFile(
            shuffleKey,
            PartitionLocation.getFileName(uniqueId, PartitionLocation.Mode.PRIMARY))
        } catch {
          case e: Exception =>
            failedPrimaries.add(uniqueId)
            logDebug(s"Destroy primary file $uniqueId for $shuffleKey failed.", e)
        }
      }
      // remove primary locations from WorkerInfo
      val releasePrimaryLocations =
        partitionLocationInfo.removePrimaryPartitions(shuffleKey, primaryLocations)
      workerInfo.releaseSlots(shuffleKey, releasePrimaryLocations._1)
    }
    // destroy replica locations
    if (replicaLocations != null && !replicaLocations.isEmpty) {
      replicaLocations.asScala.foreach { uniqueId =>
        try {
          storageManager.cleanFile(
            shuffleKey,
            PartitionLocation.getFileName(uniqueId, PartitionLocation.Mode.REPLICA))
        } catch {
          case e: Exception =>
            failedReplicas.add(uniqueId)
            logDebug(s"Destroy replica file $uniqueId for $shuffleKey failed.", e)
        }
      }
      // remove replica locations from worker info
      val releaseReplicaLocations =
        partitionLocationInfo.removeReplicaPartitions(shuffleKey, replicaLocations)
      workerInfo.releaseSlots(shuffleKey, releaseReplicaLocations._1)
    }
    // reply
    if (failedPrimaries.isEmpty && failedReplicas.isEmpty) {
      logInfo(
        s"Destroy ${primaryLocations.size()} primary location and ${replicaLocations.size()}" +
          s" replica locations for $shuffleKey successfully.")
      context.reply(
        DestroyWorkerSlotsResponse(
          StatusCode.SUCCESS,
          List.empty.asJava,
          List.empty.asJava))
    } else {
      logInfo(s"Destroy ${failedPrimaries.size()}/${primaryLocations.size()} primary location and" +
        s"${failedReplicas.size()}/${replicaLocations.size()} replica location for" +
        s" $shuffleKey PartialSuccess.")
      context.reply(
        DestroyWorkerSlotsResponse(
          StatusCode.PARTIAL_SUCCESS,
          failedPrimaries,
          failedReplicas))
    }
  }