in core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala [488:736]
def resourceOffers(
offers: IndexedSeq[WorkerOffer],
isAllFreeResources: Boolean = true): Seq[Seq[TaskDescription]] = synchronized {
// Mark each worker as alive and remember its hostname
// Also track if new executor is added
var newExecAvail = false
for (o <- offers) {
if (!hostToExecutors.contains(o.host)) {
hostToExecutors(o.host) = new HashSet[String]()
}
if (!executorIdToRunningTaskIds.contains(o.executorId)) {
hostToExecutors(o.host) += o.executorId
executorAdded(o.executorId, o.host)
executorIdToHost(o.executorId) = o.host
executorIdToRunningTaskIds(o.executorId) = HashSet[Long]()
newExecAvail = true
}
}
val hosts = offers.map(_.host).distinct
for ((host, Some(rack)) <- hosts.zip(getRacksForHosts(hosts))) {
hostsByRack.getOrElseUpdate(rack, new HashSet[String]()) += host
}
// Before making any offers, include any nodes whose expireOnFailure timeout has expired. Do
// this here to avoid a separate thread and added synchronization overhead, and also because
// updating the excluded executors and nodes is only relevant when task offers are being made.
healthTrackerOpt.foreach(_.applyExcludeOnFailureTimeout())
val filteredOffers = healthTrackerOpt.map { healthTracker =>
offers.filter { offer =>
!healthTracker.isNodeExcluded(offer.host) &&
!healthTracker.isExecutorExcluded(offer.executorId)
}
}.getOrElse(offers)
val shuffledOffers = shuffleOffers(filteredOffers)
// Build a list of tasks to assign to each worker.
// Note the size estimate here might be off with different ResourceProfiles but should be
// close estimate
val tasks = shuffledOffers.map(o => new ArrayBuffer[TaskDescription](o.cores / CPUS_PER_TASK))
val availableResources = shuffledOffers.map(_.resources).toArray
val availableCpus = shuffledOffers.map(o => o.cores).toArray
val resourceProfileIds = shuffledOffers.map(o => o.resourceProfileId).toArray
val sortedTaskSets = rootPool.getSortedTaskSetQueue
for (taskSet <- sortedTaskSets) {
logDebug("parentName: %s, name: %s, runningTasks: %s".format(
taskSet.parent.name, taskSet.name, taskSet.runningTasks))
if (newExecAvail) {
taskSet.executorAdded()
}
}
// Take each TaskSet in our scheduling order, and then offer it to each node in increasing order
// of locality levels so that it gets a chance to launch local tasks on all of them.
// NOTE: the preferredLocality order: PROCESS_LOCAL, NODE_LOCAL, NO_PREF, RACK_LOCAL, ANY
for (taskSet <- sortedTaskSets) {
// we only need to calculate available slots if using barrier scheduling, otherwise the
// value is -1
val numBarrierSlotsAvailable = if (taskSet.isBarrier) {
val rpId = taskSet.taskSet.resourceProfileId
val resAmounts = availableResources.map(_.resourceAddressAmount)
calculateAvailableSlots(this, conf, rpId, resourceProfileIds, availableCpus, resAmounts)
} else {
-1
}
// Skip the barrier taskSet if the available slots are less than the number of pending tasks.
if (taskSet.isBarrier && numBarrierSlotsAvailable < taskSet.numTasks) {
// Skip the launch process.
// TODO SPARK-24819 If the job requires more slots than available (both busy and free
// slots), fail the job on submit.
logInfo(log"Skip current round of resource offers for barrier stage " +
log"${MDC(LogKeys.STAGE_ID, taskSet.stageId)} because the barrier taskSet requires " +
log"${MDC(LogKeys.TASK_SET_NAME, taskSet.numTasks)} slots, while the total " +
log"number of available slots is ${MDC(LogKeys.NUM_SLOTS, numBarrierSlotsAvailable)}.")
} else {
var launchedAnyTask = false
var noDelaySchedulingRejects = true
var globalMinLocality: Option[TaskLocality] = None
for (currentMaxLocality <- taskSet.myLocalityLevels) {
var launchedTaskAtCurrentMaxLocality = false
do {
val (noDelayScheduleReject, minLocality) = resourceOfferSingleTaskSet(
taskSet, currentMaxLocality, shuffledOffers, availableCpus,
availableResources, tasks)
launchedTaskAtCurrentMaxLocality = minLocality.isDefined
launchedAnyTask |= launchedTaskAtCurrentMaxLocality
noDelaySchedulingRejects &= noDelayScheduleReject
globalMinLocality = minTaskLocality(globalMinLocality, minLocality)
} while (launchedTaskAtCurrentMaxLocality)
}
if (!legacyLocalityWaitReset) {
if (noDelaySchedulingRejects) {
if (launchedAnyTask &&
(isAllFreeResources || noRejectsSinceLastReset.getOrElse(taskSet.taskSet, true))) {
taskSet.resetDelayScheduleTimer(globalMinLocality)
noRejectsSinceLastReset.update(taskSet.taskSet, true)
}
} else {
noRejectsSinceLastReset.update(taskSet.taskSet, false)
}
}
if (!launchedAnyTask) {
taskSet.getCompletelyExcludedTaskIfAny(hostToExecutors).foreach { taskIndex =>
// If the taskSet is unschedulable we try to find an existing idle excluded
// executor and kill the idle executor and kick off an abortTimer which if it doesn't
// schedule a task within the timeout will abort the taskSet if we were unable to
// schedule any task from the taskSet.
// Note 1: We keep track of schedulability on a per taskSet basis rather than on a per
// task basis.
// Note 2: The taskSet can still be aborted when there are more than one idle
// excluded executors and dynamic allocation is on. This can happen when a killed
// idle executor isn't replaced in time by ExecutorAllocationManager as it relies on
// pending tasks and doesn't kill executors on idle timeouts, resulting in the abort
// timer to expire and abort the taskSet.
//
// If there are no idle executors and dynamic allocation is enabled, then we would
// notify ExecutorAllocationManager to allocate more executors to schedule the
// unschedulable tasks else we will abort immediately.
executorIdToRunningTaskIds.find(x => !isExecutorBusy(x._1)) match {
case Some ((executorId, _)) =>
if (!unschedulableTaskSetToExpiryTime.contains(taskSet)) {
healthTrackerOpt.foreach(blt => blt.killExcludedIdleExecutor(executorId))
updateUnschedulableTaskSetTimeoutAndStartAbortTimer(taskSet, taskIndex)
}
case None =>
// Notify ExecutorAllocationManager about the unschedulable task set,
// in order to provision more executors to make them schedulable
if (Utils.isDynamicAllocationEnabled(conf)) {
if (!unschedulableTaskSetToExpiryTime.contains(taskSet)) {
logInfo(log"Notifying ExecutorAllocationManager to allocate more executors to" +
log" schedule the unschedulable task before aborting" +
log" stage ${MDC(LogKeys.STAGE_ID, taskSet.stageId)}.")
dagScheduler.unschedulableTaskSetAdded(taskSet.taskSet.stageId,
taskSet.taskSet.stageAttemptId)
updateUnschedulableTaskSetTimeoutAndStartAbortTimer(taskSet, taskIndex)
}
} else {
// Abort Immediately
logInfo(log"Cannot schedule any task because all executors excluded from " +
log"failures. No idle executors can be found to kill. Aborting stage " +
log"${MDC(LogKeys.STAGE_ID, taskSet.stageId)}.")
taskSet.abortSinceCompletelyExcludedOnFailure(taskIndex)
}
}
}
} else {
// We want to defer killing any taskSets as long as we have a non excluded executor
// which can be used to schedule a task from any active taskSets. This ensures that the
// job can make progress.
// Note: It is theoretically possible that a taskSet never gets scheduled on a
// non-excluded executor and the abort timer doesn't kick in because of a constant
// submission of new TaskSets. See the PR for more details.
if (unschedulableTaskSetToExpiryTime.nonEmpty) {
logInfo(log"Clearing the expiry times for all unschedulable taskSets as a task " +
log"was recently scheduled.")
// Notify ExecutorAllocationManager as well as other subscribers that a task now
// recently becomes schedulable
dagScheduler.unschedulableTaskSetRemoved(taskSet.taskSet.stageId,
taskSet.taskSet.stageAttemptId)
unschedulableTaskSetToExpiryTime.clear()
}
}
if (launchedAnyTask && taskSet.isBarrier) {
val barrierPendingLaunchTasks = taskSet.barrierPendingLaunchTasks.values.toArray
// Check whether the barrier tasks are partially launched.
if (barrierPendingLaunchTasks.length != taskSet.numTasks) {
if (legacyLocalityWaitReset) {
// Legacy delay scheduling always reset the timer when there's a task that is able
// to be scheduled. Thus, whenever there's a timer reset could happen during a single
// round resourceOffer, tasks that don't get or have the preferred locations would
// always reject the offered resources. As a result, the barrier taskset can't get
// launched. And if we retry the resourceOffer, we'd go through the same path again
// and get into the endless loop in the end.
val logMsg = log"Fail resource offers for barrier stage " +
log"${MDC(STAGE_ID, taskSet.stageId)} because only " +
log"${MDC(NUM_PENDING_LAUNCH_TASKS, barrierPendingLaunchTasks.length)} " +
log"out of a total number " +
log"of ${MDC(NUM_TASKS, taskSet.numTasks)} tasks got resource offers. " +
log"We highly recommend you to use the non-legacy delay scheduling by setting " +
log"${MDC(CONFIG, LEGACY_LOCALITY_WAIT_RESET.key)} to false " +
log"to get rid of this error."
logWarning(logMsg)
taskSet.abort(logMsg.message)
throw SparkCoreErrors.sparkError(logMsg.message)
} else {
val curTime = clock.getTimeMillis()
if (curTime - taskSet.lastResourceOfferFailLogTime >
TaskSetManager.BARRIER_LOGGING_INTERVAL) {
logInfo(log"Releasing the assigned resource offers since only partial tasks can " +
log"be launched. Waiting for later round resource offers.")
taskSet.lastResourceOfferFailLogTime = curTime
}
barrierPendingLaunchTasks.foreach { task =>
// revert all assigned resources
availableCpus(task.assignedOfferIndex) += task.assignedCores
availableResources(task.assignedOfferIndex).release(
task.assignedResources)
// re-add the task to the schedule pending list
taskSet.addPendingTask(task.index)
}
}
} else {
// All tasks are able to launch in this barrier task set. Let's do
// some preparation work before launching them.
val launchTime = clock.getTimeMillis()
val addressesWithDescs = barrierPendingLaunchTasks.map { task =>
val taskDesc = taskSet.prepareLaunchingTask(
task.execId,
task.host,
task.index,
task.taskLocality,
false,
task.assignedCores,
task.assignedResources,
launchTime)
addRunningTask(taskDesc.taskId, taskDesc.executorId, taskSet)
tasks(task.assignedOfferIndex) += taskDesc
shuffledOffers(task.assignedOfferIndex).address.get -> taskDesc
}
// materialize the barrier coordinator.
maybeInitBarrierCoordinator()
// Update the taskInfos into all the barrier task properties.
val addressesStr = addressesWithDescs
// Addresses ordered by partitionId
.sortBy(_._2.partitionId)
.map(_._1)
.mkString(",")
addressesWithDescs.foreach(_._2.properties.setProperty("addresses", addressesStr))
logInfo(log"Successfully scheduled all the ${MDC(LogKeys.NUM_TASKS, addressesWithDescs.length)} " +
log"tasks for barrier stage ${MDC(LogKeys.STAGE_ID, taskSet.stageId)}.")
}
taskSet.barrierPendingLaunchTasks.clear()
}
}
}
// TODO SPARK-24823 Cancel a job that contains barrier stage(s) if the barrier tasks don't get
// launched within a configured time.
if (tasks.nonEmpty) {
hasLaunchedTask = true
}
return tasks.map(_.toSeq)
}