in master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala [872:982]
def handleRequestSlots(context: RpcCallContext, requestSlots: RequestSlots): Unit = {
val numReducers = requestSlots.partitionIdList.size()
val shuffleKey = Utils.makeShuffleKey(requestSlots.applicationId, requestSlots.shuffleId)
var availableWorkers = workersAvailable(requestSlots.excludedWorkerSet)
if (conf.tagsEnabled) {
availableWorkers = tagsManager.getTaggedWorkers(
requestSlots.userIdentifier,
requestSlots.tagsExpr,
availableWorkers)
}
val numAvailableWorkers = availableWorkers.size()
if (numAvailableWorkers == 0) {
logError(s"Offer slots for $shuffleKey failed due to all workers are excluded!")
context.reply(
RequestSlotsResponse(StatusCode.WORKER_EXCLUDED, new WorkerResource(), requestSlots.packed))
}
val numWorkers = Math.min(
Math.max(
if (requestSlots.shouldReplicate) 2 else 1,
if (requestSlots.maxWorkers <= 0) slotsAssignMaxWorkers
else Math.min(slotsAssignMaxWorkers, requestSlots.maxWorkers)),
numAvailableWorkers)
val startIndex = Random.nextInt(numAvailableWorkers)
val selectedWorkers = new util.ArrayList[WorkerInfo](numWorkers)
selectedWorkers.addAll(availableWorkers.subList(
startIndex,
Math.min(numAvailableWorkers, startIndex + numWorkers)))
if (startIndex + numWorkers > numAvailableWorkers) {
selectedWorkers.addAll(availableWorkers.subList(
0,
startIndex + numWorkers - numAvailableWorkers))
}
// offer slots
val slots =
masterSource.sample(MasterSource.OFFER_SLOTS_TIME, s"offerSlots-${Random.nextInt()}") {
statusSystem.workersMap.synchronized {
if (slotsAssignPolicy == SlotsAssignPolicy.LOADAWARE) {
SlotsAllocator.offerSlotsLoadAware(
selectedWorkers,
requestSlots.partitionIdList,
requestSlots.shouldReplicate,
requestSlots.shouldRackAware,
slotsAssignLoadAwareDiskGroupNum,
slotsAssignLoadAwareDiskGroupGradient,
loadAwareFlushTimeWeight,
loadAwareFetchTimeWeight,
requestSlots.availableStorageTypes)
} else {
SlotsAllocator.offerSlotsRoundRobin(
selectedWorkers,
requestSlots.partitionIdList,
requestSlots.shouldReplicate,
requestSlots.shouldRackAware,
requestSlots.availableStorageTypes)
}
}
}
if (log.isDebugEnabled()) {
val distributions = SlotsAllocator.slotsToDiskAllocations(slots)
logDebug(
s"allocate slots for shuffle $shuffleKey ${slots.asScala.map(m => m._1.toUniqueId -> m._2)}" +
s" distributions: ${distributions.asScala.map(m => m._1.toUniqueId -> m._2)}")
}
// reply false if offer slots failed
if (slots == null || slots.isEmpty) {
logError(s"Offer slots for $numReducers reducers of $shuffleKey failed!")
context.reply(RequestSlotsResponse(
StatusCode.SLOT_NOT_AVAILABLE,
new WorkerResource(),
requestSlots.packed))
return
}
// register shuffle success, update status
statusSystem.handleRequestSlots(
shuffleKey,
requestSlots.hostname,
Utils.getSlotsPerDisk(slots.asInstanceOf[WorkerResource])
.asScala.map { case (worker, slots) => worker.toUniqueId -> slots }.asJava,
requestSlots.requestId)
logInfo(s"Offer slots successfully for $numReducers reducers of $shuffleKey" +
s" on ${slots.size()} workers.")
val workersNotSelected = availableWorkers.asScala.filter(!slots.containsKey(_))
val offerSlotsExtraSize = Math.min(conf.masterSlotAssignExtraSlots, workersNotSelected.size)
if (offerSlotsExtraSize > 0) {
var index = Random.nextInt(workersNotSelected.size)
(1 to offerSlotsExtraSize).foreach(_ => {
slots.put(
workersNotSelected(index),
(new util.ArrayList[PartitionLocation](), new util.ArrayList[PartitionLocation]()))
index = (index + 1) % workersNotSelected.size
})
logInfo(s"Offered extra $offerSlotsExtraSize slots for $shuffleKey")
}
if (authEnabled) {
pushApplicationMetaToWorkers(requestSlots, slots)
}
context.reply(RequestSlotsResponse(
StatusCode.SUCCESS,
slots.asInstanceOf[WorkerResource],
requestSlots.packed))
}