private def handleReserveSlots()

in worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Controller.scala [167:299]


  private def handleReserveSlots(
      context: RpcCallContext,
      applicationId: String,
      shuffleId: Int,
      requestPrimaryLocs: jList[PartitionLocation],
      requestReplicaLocs: jList[PartitionLocation],
      splitThreshold: Long,
      splitMode: PartitionSplitMode,
      partitionType: PartitionType,
      rangeReadFilter: Boolean,
      userIdentifier: UserIdentifier,
      pushDataTimeout: Long,
      partitionSplitEnabled: Boolean,
      isSegmentGranularityVisible: Boolean): Unit = {
    val shuffleKey = Utils.makeShuffleKey(applicationId, shuffleId)
    if (shutdown.get()) {
      val msg = "Current worker is shutting down!"
      logError(s"[handleReserveSlots] $msg")
      context.reply(ReserveSlotsResponse(StatusCode.WORKER_SHUTDOWN, msg))
      return
    }

    if (storageManager.healthyWorkingDirs().size <= 0 && !conf.hasHDFSStorage && !conf.hasS3Storage && !conf.hasOssStorage) {
      val msg = "Local storage has no available dirs!"
      logError(s"[handleReserveSlots] $msg")
      context.reply(ReserveSlotsResponse(StatusCode.NO_AVAILABLE_WORKING_DIR, msg))
      return
    }
    val primaryLocs = new jArrayList[PartitionLocation]()
    try {
      for (ind <- 0 until requestPrimaryLocs.size()) {
        var location = partitionLocationInfo.getPrimaryLocation(
          shuffleKey,
          requestPrimaryLocs.get(ind).getUniqueId)
        if (location == null) {
          location = requestPrimaryLocs.get(ind)
          val writer = storageManager.createPartitionDataWriter(
            applicationId,
            shuffleId,
            location,
            splitThreshold,
            splitMode,
            partitionType,
            rangeReadFilter,
            userIdentifier,
            partitionSplitEnabled,
            isSegmentGranularityVisible)
          primaryLocs.add(new WorkingPartition(location, writer))
        } else {
          primaryLocs.add(location)
        }
      }
    } catch {
      case e: Exception =>
        logError(s"CreateWriter for $shuffleKey failed.", e)
    }
    if (primaryLocs.size() < requestPrimaryLocs.size()) {
      val msg = s"Not all primary partition satisfied for $shuffleKey"
      logWarning(s"[handleReserveSlots] $msg, will destroy writers.")
      primaryLocs.asScala.foreach { partitionLocation =>
        val fileWriter = partitionLocation.asInstanceOf[WorkingPartition].getFileWriter
        fileWriter.destroy(new IOException(s"Destroy FileWriter $fileWriter caused by " +
          s"reserving slots failed for $shuffleKey."))
      }
      context.reply(ReserveSlotsResponse(StatusCode.RESERVE_SLOTS_FAILED, msg))
      return
    }

    val replicaLocs = new jArrayList[PartitionLocation]()
    try {
      for (ind <- 0 until requestReplicaLocs.size()) {
        var location =
          partitionLocationInfo.getReplicaLocation(
            shuffleKey,
            requestReplicaLocs.get(ind).getUniqueId)
        if (location == null) {
          location = requestReplicaLocs.get(ind)
          val writer = storageManager.createPartitionDataWriter(
            applicationId,
            shuffleId,
            location,
            splitThreshold,
            splitMode,
            partitionType,
            rangeReadFilter,
            userIdentifier,
            partitionSplitEnabled,
            isSegmentGranularityVisible)
          replicaLocs.add(new WorkingPartition(location, writer))
        } else {
          replicaLocs.add(location)
        }
      }
    } catch {
      case e: Exception =>
        logError(s"CreateWriter for $shuffleKey failed.", e)
    }
    if (replicaLocs.size() < requestReplicaLocs.size()) {
      val msg = s"Not all replica partition satisfied for $shuffleKey"
      logWarning(s"[handleReserveSlots] $msg, destroy writers.")
      primaryLocs.asScala.foreach { partitionLocation =>
        val fileWriter = partitionLocation.asInstanceOf[WorkingPartition].getFileWriter
        fileWriter.destroy(new IOException(s"Destroy FileWriter $fileWriter caused by " +
          s"reserving slots failed for $shuffleKey."))
      }
      replicaLocs.asScala.foreach { partitionLocation =>
        val fileWriter = partitionLocation.asInstanceOf[WorkingPartition].getFileWriter
        fileWriter.destroy(new IOException(s"Destroy FileWriter $fileWriter caused by " +
          s"reserving slots failed for $shuffleKey."))
      }
      context.reply(ReserveSlotsResponse(StatusCode.RESERVE_SLOTS_FAILED, msg))
      return
    }

    // reserve success, update status
    partitionLocationInfo.addPrimaryPartitions(shuffleKey, primaryLocs)
    partitionLocationInfo.addReplicaPartitions(shuffleKey, replicaLocs)
    shufflePartitionType.put(shuffleKey, partitionType)
    shufflePushDataTimeout.put(
      shuffleKey,
      if (pushDataTimeout <= 0) defaultPushdataTimeout else pushDataTimeout)
    workerInfo.allocateSlots(
      shuffleKey,
      Utils.getSlotsPerDisk(requestPrimaryLocs, requestReplicaLocs))
    workerSource.incCounter(WorkerSource.SLOTS_ALLOCATED, primaryLocs.size() + replicaLocs.size())

    logInfo(s"Reserved ${primaryLocs.size()} primary location" +
      s" and ${replicaLocs.size()} replica location for $shuffleKey ")
    if (log.isDebugEnabled()) {
      logDebug(s"primary: $primaryLocs\nreplica: $replicaLocs.")
    }
    context.reply(ReserveSlotsResponse(StatusCode.SUCCESS))
  }