in master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala [555:631]
def handleRequestSlots(context: RpcCallContext, requestSlots: RequestSlots): Unit = {
val numReducers = requestSlots.partitionIdList.size()
val shuffleKey = Utils.makeShuffleKey(requestSlots.applicationId, requestSlots.shuffleId)
var availableWorkers = workersAvailable()
Collections.shuffle(availableWorkers)
val numWorkers = Math.min(
Math.max(
if (requestSlots.shouldReplicate) 2 else 1,
if (requestSlots.maxWorkers <= 0) slotsAssignMaxWorkers
else Math.min(slotsAssignMaxWorkers, requestSlots.maxWorkers)),
availableWorkers.size())
availableWorkers = availableWorkers.subList(0, numWorkers)
// offer slots
val slots =
masterSource.sample(MasterSource.OFFER_SLOTS_TIME, s"offerSlots-${Random.nextInt()}") {
statusSystem.workers.synchronized {
if (slotsAssignPolicy == SlotsAssignPolicy.LOADAWARE && !hasHDFSStorage) {
SlotsAllocator.offerSlotsLoadAware(
availableWorkers,
requestSlots.partitionIdList,
requestSlots.shouldReplicate,
requestSlots.shouldRackAware,
diskReserveSize,
slotsAssignLoadAwareDiskGroupNum,
slotsAssignLoadAwareDiskGroupGradient,
loadAwareFlushTimeWeight,
loadAwareFetchTimeWeight)
} else {
SlotsAllocator.offerSlotsRoundRobin(
availableWorkers,
requestSlots.partitionIdList,
requestSlots.shouldReplicate,
requestSlots.shouldRackAware)
}
}
}
if (log.isDebugEnabled()) {
val distributions = SlotsAllocator.slotsToDiskAllocations(slots)
logDebug(s"allocate slots for shuffle $shuffleKey $slots" +
s" distributions: ${distributions.asScala.map(m => m._1.toUniqueId() -> m._2)}")
}
// reply false if offer slots failed
if (slots == null || slots.isEmpty) {
logError(s"Offer slots for $numReducers reducers of $shuffleKey failed!")
context.reply(RequestSlotsResponse(StatusCode.SLOT_NOT_AVAILABLE, new WorkerResource()))
return
}
// register shuffle success, update status
statusSystem.handleRequestSlots(
shuffleKey,
requestSlots.hostname,
Utils.getSlotsPerDisk(slots.asInstanceOf[WorkerResource])
.asScala.map { case (worker, slots) => worker.toUniqueId() -> slots }.asJava,
requestSlots.requestId)
logInfo(s"Offer slots successfully for $numReducers reducers of $shuffleKey" +
s" on ${slots.size()} workers.")
val workersNotSelected = availableWorkers.asScala.filter(!slots.containsKey(_))
val offerSlotsExtraSize = Math.min(conf.masterSlotAssignExtraSlots, workersNotSelected.size)
if (offerSlotsExtraSize > 0) {
var index = Random.nextInt(workersNotSelected.size)
(1 to offerSlotsExtraSize).foreach(_ => {
slots.put(
workersNotSelected(index),
(new util.ArrayList[PartitionLocation](), new util.ArrayList[PartitionLocation]()))
index = (index + 1) % workersNotSelected.size
})
logInfo(s"Offered extra $offerSlotsExtraSize slots for $shuffleKey")
}
context.reply(RequestSlotsResponse(StatusCode.SUCCESS, slots.asInstanceOf[WorkerResource]))
}