def handleRequestSlots()

in master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala [872:982]


  def handleRequestSlots(context: RpcCallContext, requestSlots: RequestSlots): Unit = {
    val numReducers = requestSlots.partitionIdList.size()
    val shuffleKey = Utils.makeShuffleKey(requestSlots.applicationId, requestSlots.shuffleId)

    var availableWorkers = workersAvailable(requestSlots.excludedWorkerSet)
    if (conf.tagsEnabled) {
      availableWorkers = tagsManager.getTaggedWorkers(
        requestSlots.userIdentifier,
        requestSlots.tagsExpr,
        availableWorkers)
    }

    val numAvailableWorkers = availableWorkers.size()

    if (numAvailableWorkers == 0) {
      logError(s"Offer slots for $shuffleKey failed due to all workers are excluded!")
      context.reply(
        RequestSlotsResponse(StatusCode.WORKER_EXCLUDED, new WorkerResource(), requestSlots.packed))
    }

    val numWorkers = Math.min(
      Math.max(
        if (requestSlots.shouldReplicate) 2 else 1,
        if (requestSlots.maxWorkers <= 0) slotsAssignMaxWorkers
        else Math.min(slotsAssignMaxWorkers, requestSlots.maxWorkers)),
      numAvailableWorkers)
    val startIndex = Random.nextInt(numAvailableWorkers)
    val selectedWorkers = new util.ArrayList[WorkerInfo](numWorkers)
    selectedWorkers.addAll(availableWorkers.subList(
      startIndex,
      Math.min(numAvailableWorkers, startIndex + numWorkers)))
    if (startIndex + numWorkers > numAvailableWorkers) {
      selectedWorkers.addAll(availableWorkers.subList(
        0,
        startIndex + numWorkers - numAvailableWorkers))
    }
    // offer slots
    val slots =
      masterSource.sample(MasterSource.OFFER_SLOTS_TIME, s"offerSlots-${Random.nextInt()}") {
        statusSystem.workersMap.synchronized {
          if (slotsAssignPolicy == SlotsAssignPolicy.LOADAWARE) {
            SlotsAllocator.offerSlotsLoadAware(
              selectedWorkers,
              requestSlots.partitionIdList,
              requestSlots.shouldReplicate,
              requestSlots.shouldRackAware,
              slotsAssignLoadAwareDiskGroupNum,
              slotsAssignLoadAwareDiskGroupGradient,
              loadAwareFlushTimeWeight,
              loadAwareFetchTimeWeight,
              requestSlots.availableStorageTypes)
          } else {
            SlotsAllocator.offerSlotsRoundRobin(
              selectedWorkers,
              requestSlots.partitionIdList,
              requestSlots.shouldReplicate,
              requestSlots.shouldRackAware,
              requestSlots.availableStorageTypes)
          }
        }
      }

    if (log.isDebugEnabled()) {
      val distributions = SlotsAllocator.slotsToDiskAllocations(slots)
      logDebug(
        s"allocate slots for shuffle $shuffleKey ${slots.asScala.map(m => m._1.toUniqueId -> m._2)}" +
          s" distributions: ${distributions.asScala.map(m => m._1.toUniqueId -> m._2)}")
    }

    // reply false if offer slots failed
    if (slots == null || slots.isEmpty) {
      logError(s"Offer slots for $numReducers reducers of $shuffleKey failed!")
      context.reply(RequestSlotsResponse(
        StatusCode.SLOT_NOT_AVAILABLE,
        new WorkerResource(),
        requestSlots.packed))
      return
    }

    // register shuffle success, update status
    statusSystem.handleRequestSlots(
      shuffleKey,
      requestSlots.hostname,
      Utils.getSlotsPerDisk(slots.asInstanceOf[WorkerResource])
        .asScala.map { case (worker, slots) => worker.toUniqueId -> slots }.asJava,
      requestSlots.requestId)

    logInfo(s"Offer slots successfully for $numReducers reducers of $shuffleKey" +
      s" on ${slots.size()} workers.")

    val workersNotSelected = availableWorkers.asScala.filter(!slots.containsKey(_))
    val offerSlotsExtraSize = Math.min(conf.masterSlotAssignExtraSlots, workersNotSelected.size)
    if (offerSlotsExtraSize > 0) {
      var index = Random.nextInt(workersNotSelected.size)
      (1 to offerSlotsExtraSize).foreach(_ => {
        slots.put(
          workersNotSelected(index),
          (new util.ArrayList[PartitionLocation](), new util.ArrayList[PartitionLocation]()))
        index = (index + 1) % workersNotSelected.size
      })
      logInfo(s"Offered extra $offerSlotsExtraSize slots for $shuffleKey")
    }

    if (authEnabled) {
      pushApplicationMetaToWorkers(requestSlots, slots)
    }
    context.reply(RequestSlotsResponse(
      StatusCode.SUCCESS,
      slots.asInstanceOf[WorkerResource],
      requestSlots.packed))
  }