in client/src/main/scala/org/apache/celeborn/client/ChangePartitionManager.scala [224:411]
def handleRequestPartitions(
shuffleId: Int,
changePartitions: Array[ChangePartitionRequest],
isSegmentGranularityVisible: Boolean): Unit = {
val requestsMap = changePartitionRequests.get(shuffleId)
val changes = changePartitions.map { change =>
s"${change.shuffleId}-${change.partitionId}-${change.epoch}"
}.mkString("[", ",", "]")
logWarning(s"Batch handle change partition for $changes")
// Exclude all failed workers
if (changePartitions.exists(_.causes.isDefined) && !testRetryRevive) {
changePartitions.filter(_.causes.isDefined).foreach { changePartition =>
lifecycleManager.workerStatusTracker.excludeWorkerFromPartition(
shuffleId,
changePartition.oldPartition,
changePartition.causes.get)
}
}
// remove together to reduce lock time
def replySuccess(locations: Array[PartitionLocation]): Unit = {
val locksForShuffle = locks.computeIfAbsent(shuffleId, locksRegisterFunc)
locations.map { location =>
locksForShuffle(location.getId % locksForShuffle.length).synchronized {
if (batchHandleChangePartitionEnabled) {
inBatchPartitions.get(shuffleId).remove(location.getId)
}
// Here one partition id can be remove more than once,
// so need to filter null result before reply.
location -> Option(requestsMap.remove(location.getId))
}
}.foreach { case (newLocation, requests) =>
requests.map(_.asScala.toList.foreach(req =>
req.context.reply(
req.partitionId,
StatusCode.SUCCESS,
Option(newLocation),
lifecycleManager.workerStatusTracker.workerAvailableByLocation(req.oldPartition))))
}
}
// remove together to reduce lock time
def replyFailure(status: StatusCode): Unit = {
changePartitions.map { changePartition =>
val locksForShuffle = locks.computeIfAbsent(shuffleId, locksRegisterFunc)
locksForShuffle(changePartition.partitionId % locksForShuffle.length).synchronized {
if (batchHandleChangePartitionEnabled) {
inBatchPartitions.get(shuffleId).remove(changePartition.partitionId)
}
Option(requestsMap.remove(changePartition.partitionId))
}
}.foreach { requests =>
requests.map(_.asScala.toList.foreach(req =>
req.context.reply(
req.partitionId,
status,
None,
lifecycleManager.workerStatusTracker.workerAvailableByLocation(req.oldPartition))))
}
}
val candidates = new util.HashSet[WorkerInfo]()
val newlyRequestedLocations = new WorkerResource()
val snapshotCandidates =
lifecycleManager
.workerSnapshots(shuffleId)
.asScala
.values
.map(_.workerInfo)
.filter(lifecycleManager.workerStatusTracker.workerAvailable)
.toSet
.asJava
candidates.addAll(snapshotCandidates)
if (dynamicResourceEnabled) {
val shuffleAllocatedWorkers = lifecycleManager.workerSnapshots(shuffleId).size()
val unavailableWorkerRatio = 1 - (snapshotCandidates.size * 1.0 / shuffleAllocatedWorkers)
if (candidates.size < 1 || (pushReplicateEnabled && candidates.size < 2)
|| (unavailableWorkerRatio >= dynamicResourceUnavailableFactor)) {
// get new available workers for the request partition ids
val partitionIds = new util.ArrayList[Integer](
changePartitions.map(_.partitionId).map(Integer.valueOf).toList.asJava)
// The partition id value is not important here because we're just trying to get the workers to use
val requestSlotsRes =
lifecycleManager.requestMasterRequestSlotsWithRetry(shuffleId, partitionIds)
requestSlotsRes.status match {
case StatusCode.REQUEST_FAILED =>
logInfo(s"ChangePartition requestSlots RPC request failed for $shuffleId!")
case StatusCode.SLOT_NOT_AVAILABLE =>
logInfo(s"ChangePartition requestSlots for $shuffleId failed, have no available slots.")
case StatusCode.SUCCESS =>
logDebug(
s"ChangePartition requestSlots request for workers Success! shuffleId: $shuffleId availableWorkers Info: ${requestSlotsRes.workerResource.keySet()}")
case StatusCode.WORKER_EXCLUDED =>
logInfo(s"ChangePartition requestSlots request for workers for $shuffleId failed due to all workers be excluded!")
case _ => // won't happen
throw new UnsupportedOperationException()
}
if (requestSlotsRes.status.equals(StatusCode.SUCCESS)) {
requestSlotsRes.workerResource.keySet().asScala.foreach { workerInfo: WorkerInfo =>
newlyRequestedLocations.computeIfAbsent(workerInfo, lifecycleManager.newLocationFunc)
}
// SetupEndpoint for new Workers
val workersRequireEndpoints = new util.HashSet[WorkerInfo](
requestSlotsRes.workerResource.keySet()
.asScala
.filter(lifecycleManager.workerStatusTracker.workerAvailable)
.asJava)
val connectFailedWorkers = new ShuffleFailedWorkers()
lifecycleManager.setupEndpoints(
workersRequireEndpoints,
shuffleId,
connectFailedWorkers)
workersRequireEndpoints.removeAll(connectFailedWorkers.asScala.keys.toList.asJava)
candidates.addAll(workersRequireEndpoints)
// Update worker status
lifecycleManager.workerStatusTracker.recordWorkerFailure(connectFailedWorkers)
lifecycleManager.workerStatusTracker.removeFromExcludedWorkers(candidates)
}
}
}
if (candidates.size < 1 || (pushReplicateEnabled && candidates.size < 2)) {
logError("[Update partition] failed for not enough candidates for revive.")
replyFailure(StatusCode.SLOT_NOT_AVAILABLE)
return
}
// PartitionSplit all contains oldPartition
val newlyAllocatedLocations =
reallocateChangePartitionRequestSlotsFromCandidates(
changePartitions.toList,
candidates.asScala.toList)
if (!lifecycleManager.reserveSlotsWithRetry(
shuffleId,
candidates,
newlyAllocatedLocations,
isSegmentGranularityVisible = isSegmentGranularityVisible)) {
logError(s"[Update partition] failed for $shuffleId.")
replyFailure(StatusCode.RESERVE_SLOTS_FAILED)
return
}
// newlyRequestedLocations is empty if dynamicResourceEnabled is false
newlyRequestedLocations.putAll(newlyAllocatedLocations)
val newPrimaryLocations = newlyRequestedLocations.asScala.flatMap {
case (workInfo, (primaryLocations, replicaLocations)) =>
// Add all re-allocated slots to worker snapshots.
val partitionLocationInfo = lifecycleManager.workerSnapshots(shuffleId).computeIfAbsent(
workInfo.toUniqueId,
new util.function.Function[String, ShufflePartitionLocationInfo] {
override def apply(workerId: String): ShufflePartitionLocationInfo = {
new ShufflePartitionLocationInfo(workInfo)
}
})
partitionLocationInfo.addPrimaryPartitions(primaryLocations)
partitionLocationInfo.addReplicaPartitions(replicaLocations)
lifecycleManager.updateLatestPartitionLocations(shuffleId, primaryLocations)
// partition location can be null when call reserveSlotsWithRetry().
val locations = (primaryLocations.asScala ++ replicaLocations.asScala.map(_.getPeer))
.distinct.filter(_ != null)
if (locations.nonEmpty) {
val changes = locations.map { partition =>
s"(partition ${partition.getId} epoch from ${partition.getEpoch - 1} to ${partition.getEpoch})"
}.mkString("[", ", ", "]")
logInfo(s"[Update partition] success for " +
s"shuffle $shuffleId, succeed partitions: " +
s"$changes.")
}
// TODO: should record the new partition locations and acknowledge the new partitionLocations to downstream task,
// in scenario the downstream task start early before the upstream task.
locations
}
replySuccess(newPrimaryLocations.toArray)
}