in worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Controller.scala [167:299]
private def handleReserveSlots(
context: RpcCallContext,
applicationId: String,
shuffleId: Int,
requestPrimaryLocs: jList[PartitionLocation],
requestReplicaLocs: jList[PartitionLocation],
splitThreshold: Long,
splitMode: PartitionSplitMode,
partitionType: PartitionType,
rangeReadFilter: Boolean,
userIdentifier: UserIdentifier,
pushDataTimeout: Long,
partitionSplitEnabled: Boolean,
isSegmentGranularityVisible: Boolean): Unit = {
val shuffleKey = Utils.makeShuffleKey(applicationId, shuffleId)
if (shutdown.get()) {
val msg = "Current worker is shutting down!"
logError(s"[handleReserveSlots] $msg")
context.reply(ReserveSlotsResponse(StatusCode.WORKER_SHUTDOWN, msg))
return
}
if (storageManager.healthyWorkingDirs().size <= 0 && !conf.hasHDFSStorage && !conf.hasS3Storage && !conf.hasOssStorage) {
val msg = "Local storage has no available dirs!"
logError(s"[handleReserveSlots] $msg")
context.reply(ReserveSlotsResponse(StatusCode.NO_AVAILABLE_WORKING_DIR, msg))
return
}
val primaryLocs = new jArrayList[PartitionLocation]()
try {
for (ind <- 0 until requestPrimaryLocs.size()) {
var location = partitionLocationInfo.getPrimaryLocation(
shuffleKey,
requestPrimaryLocs.get(ind).getUniqueId)
if (location == null) {
location = requestPrimaryLocs.get(ind)
val writer = storageManager.createPartitionDataWriter(
applicationId,
shuffleId,
location,
splitThreshold,
splitMode,
partitionType,
rangeReadFilter,
userIdentifier,
partitionSplitEnabled,
isSegmentGranularityVisible)
primaryLocs.add(new WorkingPartition(location, writer))
} else {
primaryLocs.add(location)
}
}
} catch {
case e: Exception =>
logError(s"CreateWriter for $shuffleKey failed.", e)
}
if (primaryLocs.size() < requestPrimaryLocs.size()) {
val msg = s"Not all primary partition satisfied for $shuffleKey"
logWarning(s"[handleReserveSlots] $msg, will destroy writers.")
primaryLocs.asScala.foreach { partitionLocation =>
val fileWriter = partitionLocation.asInstanceOf[WorkingPartition].getFileWriter
fileWriter.destroy(new IOException(s"Destroy FileWriter $fileWriter caused by " +
s"reserving slots failed for $shuffleKey."))
}
context.reply(ReserveSlotsResponse(StatusCode.RESERVE_SLOTS_FAILED, msg))
return
}
val replicaLocs = new jArrayList[PartitionLocation]()
try {
for (ind <- 0 until requestReplicaLocs.size()) {
var location =
partitionLocationInfo.getReplicaLocation(
shuffleKey,
requestReplicaLocs.get(ind).getUniqueId)
if (location == null) {
location = requestReplicaLocs.get(ind)
val writer = storageManager.createPartitionDataWriter(
applicationId,
shuffleId,
location,
splitThreshold,
splitMode,
partitionType,
rangeReadFilter,
userIdentifier,
partitionSplitEnabled,
isSegmentGranularityVisible)
replicaLocs.add(new WorkingPartition(location, writer))
} else {
replicaLocs.add(location)
}
}
} catch {
case e: Exception =>
logError(s"CreateWriter for $shuffleKey failed.", e)
}
if (replicaLocs.size() < requestReplicaLocs.size()) {
val msg = s"Not all replica partition satisfied for $shuffleKey"
logWarning(s"[handleReserveSlots] $msg, destroy writers.")
primaryLocs.asScala.foreach { partitionLocation =>
val fileWriter = partitionLocation.asInstanceOf[WorkingPartition].getFileWriter
fileWriter.destroy(new IOException(s"Destroy FileWriter $fileWriter caused by " +
s"reserving slots failed for $shuffleKey."))
}
replicaLocs.asScala.foreach { partitionLocation =>
val fileWriter = partitionLocation.asInstanceOf[WorkingPartition].getFileWriter
fileWriter.destroy(new IOException(s"Destroy FileWriter $fileWriter caused by " +
s"reserving slots failed for $shuffleKey."))
}
context.reply(ReserveSlotsResponse(StatusCode.RESERVE_SLOTS_FAILED, msg))
return
}
// reserve success, update status
partitionLocationInfo.addPrimaryPartitions(shuffleKey, primaryLocs)
partitionLocationInfo.addReplicaPartitions(shuffleKey, replicaLocs)
shufflePartitionType.put(shuffleKey, partitionType)
shufflePushDataTimeout.put(
shuffleKey,
if (pushDataTimeout <= 0) defaultPushdataTimeout else pushDataTimeout)
workerInfo.allocateSlots(
shuffleKey,
Utils.getSlotsPerDisk(requestPrimaryLocs, requestReplicaLocs))
workerSource.incCounter(WorkerSource.SLOTS_ALLOCATED, primaryLocs.size() + replicaLocs.size())
logInfo(s"Reserved ${primaryLocs.size()} primary location" +
s" and ${replicaLocs.size()} replica location for $shuffleKey ")
if (log.isDebugEnabled()) {
logDebug(s"primary: $primaryLocs\nreplica: $replicaLocs.")
}
context.reply(ReserveSlotsResponse(StatusCode.SUCCESS))
}