in client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala [301:480]
private def offerAndReserveSlots(
context: RegisterCallContext,
shuffleId: Int,
numMappers: Int,
numPartitions: Int,
partitionId: Int = -1): Unit = {
val partitionType = getPartitionType(shuffleId)
registeringShuffleRequest.synchronized {
if (registeringShuffleRequest.containsKey(shuffleId)) {
// If same request already exists in the registering request list for the same shuffle,
// just register and return.
logDebug(s"[handleRegisterShuffle] request for shuffle $shuffleId exists, just register")
registeringShuffleRequest.get(shuffleId).add(context)
return
} else {
// If shuffle is registered, reply this shuffle's partition location and return.
// Else add this request to registeringShuffleRequest.
if (registeredShuffle.contains(shuffleId)) {
val initialLocs = workerSnapshots(shuffleId)
.values()
.asScala
.flatMap(_.getAllPrimaryLocationsWithMinEpoch().asScala)
.filter(p =>
(partitionType == PartitionType.REDUCE && p.getEpoch == 0) || (partitionType == PartitionType.MAP && p.getId == partitionId))
.toArray
partitionType match {
case PartitionType.MAP => processMapTaskReply(
shuffleId,
context.context,
partitionId,
initialLocs)
case PartitionType.REDUCE =>
context.reply(RegisterShuffleResponse(StatusCode.SUCCESS, initialLocs))
case _ =>
throw new UnsupportedOperationException(s"Not support $partitionType yet")
}
return
}
logInfo(s"New shuffle request, shuffleId $shuffleId, partitionType: $partitionType " +
s"numMappers: $numMappers, numReducers: $numPartitions.")
val set = new util.HashSet[RegisterCallContext]()
set.add(context)
registeringShuffleRequest.put(shuffleId, set)
}
}
def processMapTaskReply(
shuffleId: Int,
context: RpcCallContext,
partitionId: Int,
partitionLocations: Array[PartitionLocation]): Unit = {
// if any partition location resource exist just reply
if (partitionLocations.size > 0) {
context.reply(RegisterShuffleResponse(StatusCode.SUCCESS, partitionLocations))
} else {
// request new resource for this task
changePartitionManager.handleRequestPartitionLocation(
ApplyNewLocationCallContext(context),
shuffleId,
partitionId,
-1,
null)
}
}
// Reply to all RegisterShuffle request for current shuffle id.
def reply(response: PbRegisterShuffleResponse): Unit = {
registeringShuffleRequest.synchronized {
registeringShuffleRequest.asScala
.get(shuffleId)
.foreach(_.asScala.foreach(context => {
partitionType match {
case PartitionType.MAP =>
if (response.getStatus == StatusCode.SUCCESS.getValue) {
val partitionLocations =
response.getPartitionLocationsList.asScala.filter(
_.getId == context.partitionId).map(r =>
PbSerDeUtils.fromPbPartitionLocation(r)).toArray
processMapTaskReply(
shuffleId,
context.context,
context.partitionId,
partitionLocations)
} else {
// when register not success, need reply origin response,
// otherwise will lost original exception message
context.reply(response)
}
case PartitionType.REDUCE => context.reply(response)
case _ =>
throw new UnsupportedOperationException(s"Not support $partitionType yet")
}
}))
registeringShuffleRequest.remove(shuffleId)
}
}
// First, request to get allocated slots from Primary
val ids = new util.ArrayList[Integer](numPartitions)
(0 until numPartitions).foreach(idx => ids.add(Integer.valueOf(idx)))
val res = requestMasterRequestSlotsWithRetry(shuffleId, ids)
res.status match {
case StatusCode.REQUEST_FAILED =>
logInfo(s"OfferSlots RPC request failed for $shuffleId!")
reply(RegisterShuffleResponse(StatusCode.REQUEST_FAILED, Array.empty))
return
case StatusCode.SLOT_NOT_AVAILABLE =>
logInfo(s"OfferSlots for $shuffleId failed!")
reply(RegisterShuffleResponse(StatusCode.SLOT_NOT_AVAILABLE, Array.empty))
return
case StatusCode.SUCCESS =>
logInfo(s"OfferSlots for $shuffleId Success!Slots Info: ${res.workerResource}")
case _ => // won't happen
throw new UnsupportedOperationException()
}
// Reserve slots for each PartitionLocation. When response status is SUCCESS, WorkerResource
// won't be empty since primary will reply SlotNotAvailable status when reserved slots is empty.
val slots = res.workerResource
val candidatesWorkers = new util.HashSet(slots.keySet())
val connectFailedWorkers = new ShuffleFailedWorkers()
// Second, for each worker, try to initialize the endpoint.
val parallelism = Math.min(Math.max(1, slots.size()), conf.clientRpcMaxParallelism)
ThreadUtils.parmap(slots.asScala.to, "InitWorkerRef", parallelism) { case (workerInfo, _) =>
try {
workerInfo.endpoint =
rpcEnv.setupEndpointRef(RpcAddress.apply(workerInfo.host, workerInfo.rpcPort), WORKER_EP)
} catch {
case t: Throwable =>
logError(s"Init rpc client failed for $shuffleId on $workerInfo during reserve slots.", t)
connectFailedWorkers.put(
workerInfo,
(StatusCode.WORKER_UNKNOWN, System.currentTimeMillis()))
}
}
candidatesWorkers.removeAll(connectFailedWorkers.asScala.keys.toList.asJava)
workerStatusTracker.recordWorkerFailure(connectFailedWorkers)
// If newly allocated from primary and can setup endpoint success, LifecycleManager should remove worker from
// the excluded worker list to improve the accuracy of the list.
workerStatusTracker.removeFromExcludedWorkers(candidatesWorkers)
// Third, for each slot, LifecycleManager should ask Worker to reserve the slot
// and prepare the pushing data env.
val reserveSlotsSuccess =
reserveSlotsWithRetry(
shuffleId,
candidatesWorkers,
slots,
updateEpoch = false)
// If reserve slots failed, clear allocated resources, reply ReserveSlotFailed and return.
if (!reserveSlotsSuccess) {
logError(s"reserve buffer for $shuffleId failed, reply to all.")
reply(RegisterShuffleResponse(StatusCode.RESERVE_SLOTS_FAILED, Array.empty))
} else {
logInfo(s"ReserveSlots for $shuffleId success with details:$slots!")
// Forth, register shuffle success, update status
val allocatedWorkers =
JavaUtils.newConcurrentHashMap[WorkerInfo, ShufflePartitionLocationInfo]()
slots.asScala.foreach { case (workerInfo, (primaryLocations, replicaLocations)) =>
val partitionLocationInfo = new ShufflePartitionLocationInfo()
partitionLocationInfo.addPrimaryPartitions(primaryLocations)
updateLatestPartitionLocations(shuffleId, primaryLocations)
partitionLocationInfo.addReplicaPartitions(replicaLocations)
allocatedWorkers.put(workerInfo, partitionLocationInfo)
}
shuffleAllocatedWorkers.put(shuffleId, allocatedWorkers)
registeredShuffle.add(shuffleId)
commitManager.registerShuffle(shuffleId, numMappers)
// Fifth, reply the allocated partition location to ShuffleClient.
logInfo(s"Handle RegisterShuffle Success for $shuffleId.")
val allPrimaryPartitionLocations = slots.asScala.flatMap(_._2._1.asScala).toArray
reply(RegisterShuffleResponse(StatusCode.SUCCESS, allPrimaryPartitionLocations))
}
}