in client/src/main/scala/org/apache/celeborn/client/ChangePartitionManager.scala [196:308]
def handleRequestPartitions(
shuffleId: Int,
changePartitions: Array[ChangePartitionRequest]): Unit = {
val requestsMap = changePartitionRequests.get(shuffleId)
val changes = changePartitions.map { change =>
s"${change.shuffleId}-${change.partitionId}-${change.epoch}"
}.mkString("[", ",", "]")
logWarning(s"Batch handle change partition for $changes")
// Exclude all failed workers
if (changePartitions.exists(_.causes.isDefined)) {
changePartitions.filter(_.causes.isDefined).foreach { changePartition =>
lifecycleManager.workerStatusTracker.excludeWorkerFromPartition(
shuffleId,
changePartition.oldPartition,
changePartition.causes.get)
}
}
// remove together to reduce lock time
def replySuccess(locations: Array[PartitionLocation]): Unit = {
requestsMap.synchronized {
locations.map { location =>
if (batchHandleChangePartitionEnabled) {
inBatchPartitions.get(shuffleId).remove(location.getId)
}
// Here one partition id can be remove more than once,
// so need to filter null result before reply.
location -> Option(requestsMap.remove(location.getId))
}
}.foreach { case (newLocation, requests) =>
requests.map(_.asScala.toList.foreach(req =>
req.context.reply(
req.partitionId,
StatusCode.SUCCESS,
Option(newLocation),
lifecycleManager.workerStatusTracker.workerAvailable(req.oldPartition))))
}
}
// remove together to reduce lock time
def replyFailure(status: StatusCode): Unit = {
requestsMap.synchronized {
changePartitions.map { changePartition =>
if (batchHandleChangePartitionEnabled) {
inBatchPartitions.get(shuffleId).remove(changePartition.partitionId)
}
Option(requestsMap.remove(changePartition.partitionId))
}
}.foreach { requests =>
requests.map(_.asScala.toList.foreach(req =>
req.context.reply(
req.partitionId,
status,
None,
lifecycleManager.workerStatusTracker.workerAvailable(req.oldPartition))))
}
}
// Get candidate worker that not in excluded worker list of shuffleId
val candidates =
lifecycleManager
.workerSnapshots(shuffleId)
.keySet()
.asScala
.filter(lifecycleManager.workerStatusTracker.workerAvailable)
.toList
if (candidates.size < 1 || (pushReplicateEnabled && candidates.size < 2)) {
logError("[Update partition] failed for not enough candidates for revive.")
replyFailure(StatusCode.SLOT_NOT_AVAILABLE)
return
}
// PartitionSplit all contains oldPartition
val newlyAllocatedLocations =
reallocateChangePartitionRequestSlotsFromCandidates(changePartitions.toList, candidates)
if (!lifecycleManager.reserveSlotsWithRetry(
shuffleId,
new util.HashSet(candidates.toSet.asJava),
newlyAllocatedLocations)) {
logError(s"[Update partition] failed for $shuffleId.")
replyFailure(StatusCode.RESERVE_SLOTS_FAILED)
return
}
val newPrimaryLocations =
newlyAllocatedLocations.asScala.flatMap {
case (workInfo, (primaryLocations, replicaLocations)) =>
// Add all re-allocated slots to worker snapshots.
lifecycleManager.workerSnapshots(shuffleId).asScala
.get(workInfo)
.foreach { partitionLocationInfo =>
partitionLocationInfo.addPrimaryPartitions(primaryLocations)
lifecycleManager.updateLatestPartitionLocations(shuffleId, primaryLocations)
partitionLocationInfo.addReplicaPartitions(replicaLocations)
}
// partition location can be null when call reserveSlotsWithRetry().
val locations = (primaryLocations.asScala ++ replicaLocations.asScala.map(_.getPeer))
.distinct.filter(_ != null)
if (locations.nonEmpty) {
val changes = locations.map { partition =>
s"(partition ${partition.getId} epoch from ${partition.getEpoch - 1} to ${partition.getEpoch})"
}.mkString("[", ", ", "]")
logInfo(s"[Update partition] success for " +
s"shuffle $shuffleId, succeed partitions: " +
s"$changes.")
}
locations
}
replySuccess(newPrimaryLocations.toArray)
}