in core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala [1873:2347]
private[scheduler] def handleTaskCompletion(event: CompletionEvent): Unit = {
val task = event.task
val stageId = task.stageId
outputCommitCoordinator.taskCompleted(
stageId,
task.stageAttemptId,
task.partitionId,
event.taskInfo.attemptNumber, // this is a task attempt number
event.reason)
if (!stageIdToStage.contains(task.stageId)) {
// The stage may have already finished when we get this event -- e.g. maybe it was a
// speculative task. It is important that we send the TaskEnd event in any case, so listeners
// are properly notified and can chose to handle it. For instance, some listeners are
// doing their own accounting and if they don't get the task end event they think
// tasks are still running when they really aren't.
postTaskEnd(event)
// Skip all the actions if the stage has been cancelled.
return
}
val stage = stageIdToStage(task.stageId)
// Make sure the task's accumulators are updated before any other processing happens, so that
// we can post a task end event before any jobs or stages are updated. The accumulators are
// only updated in certain cases.
event.reason match {
case Success =>
task match {
case rt: ResultTask[_, _] =>
val resultStage = stage.asInstanceOf[ResultStage]
resultStage.activeJob match {
case Some(job) =>
// Only update the accumulator once for each result task.
if (!job.finished(rt.outputId)) {
updateAccumulators(event)
}
case None => // Ignore update if task's job has finished.
}
case _ =>
updateAccumulators(event)
}
case _: ExceptionFailure | _: TaskKilled => updateAccumulators(event)
case _ =>
}
if (trackingCacheVisibility) {
// Update rdd blocks' visibility status.
blockManagerMaster.updateRDDBlockVisibility(
event.taskInfo.taskId, visible = event.reason == Success)
}
postTaskEnd(event)
event.reason match {
case Success =>
// An earlier attempt of a stage (which is zombie) may still have running tasks. If these
// tasks complete, they still count and we can mark the corresponding partitions as
// finished if the stage is determinate. Here we notify the task scheduler to skip running
// tasks for the same partition to save resource.
if (!stage.isIndeterminate && task.stageAttemptId < stage.latestInfo.attemptNumber()) {
taskScheduler.notifyPartitionCompletion(stageId, task.partitionId)
}
task match {
case rt: ResultTask[_, _] =>
// Cast to ResultStage here because it's part of the ResultTask
// TODO Refactor this out to a function that accepts a ResultStage
val resultStage = stage.asInstanceOf[ResultStage]
resultStage.activeJob match {
case Some(job) =>
if (!job.finished(rt.outputId)) {
job.finished(rt.outputId) = true
job.numFinished += 1
// If the whole job has finished, remove it
if (job.numFinished == job.numPartitions) {
markStageAsFinished(resultStage)
cancelRunningIndependentStages(job, s"Job ${job.jobId} is finished.")
cleanupStateForJobAndIndependentStages(job)
try {
// killAllTaskAttempts will fail if a SchedulerBackend does not implement
// killTask.
logInfo(log"Job ${MDC(JOB_ID, job.jobId)} is finished. Cancelling " +
log"potential speculative or zombie tasks for this job")
// ResultStage is only used by this job. It's safe to kill speculative or
// zombie tasks in this stage.
taskScheduler.killAllTaskAttempts(
stageId,
shouldInterruptTaskThread(job),
reason = "Stage finished")
} catch {
case e: UnsupportedOperationException =>
logWarning(log"Could not cancel tasks " +
log"for stage ${MDC(STAGE, stageId)}", e)
}
listenerBus.post(
SparkListenerJobEnd(job.jobId, clock.getTimeMillis(), JobSucceeded))
}
// taskSucceeded runs some user code that might throw an exception. Make sure
// we are resilient against that.
try {
job.listener.taskSucceeded(rt.outputId, event.result)
} catch {
case e: Throwable if !Utils.isFatalError(e) =>
// TODO: Perhaps we want to mark the resultStage as failed?
job.listener.jobFailed(new SparkDriverExecutionException(e))
}
}
case None =>
logInfo(log"Ignoring result from ${MDC(RESULT, rt)} because its job has finished")
}
case smt: ShuffleMapTask =>
val shuffleStage = stage.asInstanceOf[ShuffleMapStage]
// Ignore task completion for old attempt of indeterminate stage
val ignoreIndeterminate = stage.isIndeterminate &&
task.stageAttemptId < stage.latestInfo.attemptNumber()
if (!ignoreIndeterminate) {
shuffleStage.pendingPartitions -= task.partitionId
val status = event.result.asInstanceOf[MapStatus]
val execId = status.location.executorId
logDebug("ShuffleMapTask finished on " + execId)
if (executorFailureEpoch.contains(execId) &&
smt.epoch <= executorFailureEpoch(execId)) {
logInfo(log"Ignoring possibly bogus ${MDC(STAGE, smt)} completion from " +
log"executor ${MDC(EXECUTOR_ID, execId)}")
} else {
// The epoch of the task is acceptable (i.e., the task was launched after the most
// recent failure we're aware of for the executor), so mark the task's output as
// available.
mapOutputTracker.registerMapOutput(
shuffleStage.shuffleDep.shuffleId, smt.partitionId, status)
}
} else {
logInfo(log"Ignoring ${MDC(TASK_NAME, smt)} completion from an older attempt of indeterminate stage")
}
if (runningStages.contains(shuffleStage) && shuffleStage.pendingPartitions.isEmpty) {
if (!shuffleStage.shuffleDep.isShuffleMergeFinalizedMarked &&
shuffleStage.shuffleDep.getMergerLocs.nonEmpty) {
checkAndScheduleShuffleMergeFinalize(shuffleStage)
} else {
processShuffleMapStageCompletion(shuffleStage)
}
}
}
case FetchFailed(bmAddress, shuffleId, _, mapIndex, reduceId, failureMessage) =>
val failedStage = stageIdToStage(task.stageId)
val mapStage = shuffleIdToMapStage(shuffleId)
if (failedStage.latestInfo.attemptNumber() != task.stageAttemptId) {
logInfo(log"Ignoring fetch failure from " +
log"${MDC(TASK_ID, task)} as it's from " +
log"${MDC(FAILED_STAGE, failedStage)} attempt " +
log"${MDC(STAGE_ATTEMPT_ID, task.stageAttemptId)} and there is a more recent attempt for " +
log"that stage (attempt " +
log"${MDC(NUM_ATTEMPT, failedStage.latestInfo.attemptNumber())}) running")
} else {
val ignoreStageFailure = ignoreDecommissionFetchFailure &&
isExecutorDecommissioningOrDecommissioned(taskScheduler, bmAddress)
if (ignoreStageFailure) {
logInfo(log"Ignoring fetch failure from ${MDC(TASK_NAME, task)} of " +
log"${MDC(FAILED_STAGE, failedStage)} attempt " +
log"${MDC(STAGE_ATTEMPT_ID, task.stageAttemptId)} when count " +
log"${MDC(MAX_ATTEMPTS, config.STAGE_MAX_CONSECUTIVE_ATTEMPTS.key)} " +
log"as executor ${MDC(EXECUTOR_ID, bmAddress.executorId)} is decommissioned and " +
log"${MDC(CONFIG, config.STAGE_IGNORE_DECOMMISSION_FETCH_FAILURE.key)}=true")
} else {
failedStage.failedAttemptIds.add(task.stageAttemptId)
}
val shouldAbortStage =
failedStage.failedAttemptIds.size >= maxConsecutiveStageAttempts ||
disallowStageRetryForTest
// It is likely that we receive multiple FetchFailed for a single stage (because we have
// multiple tasks running concurrently on different executors). In that case, it is
// possible the fetch failure has already been handled by the scheduler.
if (runningStages.contains(failedStage)) {
logInfo(log"Marking ${MDC(FAILED_STAGE, failedStage)} " +
log"(${MDC(FAILED_STAGE_NAME, failedStage.name)}) as failed " +
log"due to a fetch failure from ${MDC(STAGE, mapStage)} " +
log"(${MDC(STAGE_NAME, mapStage.name)})")
markStageAsFinished(failedStage, errorMessage = Some(failureMessage),
willRetry = !shouldAbortStage)
} else {
logDebug(s"Received fetch failure from $task, but it's from $failedStage which is no " +
"longer running")
}
if (mapStage.rdd.isBarrier()) {
// Mark all the map as broken in the map stage, to ensure retry all the tasks on
// resubmitted stage attempt.
// TODO: SPARK-35547: Clean all push-based shuffle metadata like merge enabled and
// TODO: finalized as we are clearing all the merge results.
mapOutputTracker.unregisterAllMapAndMergeOutput(shuffleId)
} else if (mapIndex != -1) {
// Mark the map whose fetch failed as broken in the map stage
mapOutputTracker.unregisterMapOutput(shuffleId, mapIndex, bmAddress)
if (pushBasedShuffleEnabled) {
// Possibly unregister the merge result <shuffleId, reduceId>, if the FetchFailed
// mapIndex is part of the merge result of <shuffleId, reduceId>
mapOutputTracker.
unregisterMergeResult(shuffleId, reduceId, bmAddress, Option(mapIndex))
}
} else {
// Unregister the merge result of <shuffleId, reduceId> if there is a FetchFailed event
// and is not a MetaDataFetchException which is signified by bmAddress being null
if (bmAddress != null &&
bmAddress.executorId.equals(BlockManagerId.SHUFFLE_MERGER_IDENTIFIER)) {
assert(pushBasedShuffleEnabled, "Push based shuffle expected to " +
"be enabled when handling merge block fetch failure.")
mapOutputTracker.
unregisterMergeResult(shuffleId, reduceId, bmAddress, None)
}
}
if (failedStage.rdd.isBarrier()) {
failedStage match {
case failedMapStage: ShuffleMapStage =>
// Mark all the map as broken in the map stage, to ensure retry all the tasks on
// resubmitted stage attempt.
mapOutputTracker.unregisterAllMapAndMergeOutput(failedMapStage.shuffleDep.shuffleId)
case failedResultStage: ResultStage =>
// Abort the failed result stage since we may have committed output for some
// partitions.
val reason = "Could not recover from a failed barrier ResultStage. Most recent " +
s"failure reason: $failureMessage"
abortStage(failedResultStage, reason, None)
}
}
if (shouldAbortStage) {
val abortMessage = if (disallowStageRetryForTest) {
"Fetch failure will not retry stage due to testing config"
} else {
s"$failedStage (${failedStage.name}) has failed the maximum allowable number of " +
s"times: $maxConsecutiveStageAttempts. Most recent failure reason:\n" +
failureMessage
}
abortStage(failedStage, abortMessage, None)
} else { // update failedStages and make sure a ResubmitFailedStages event is enqueued
// TODO: Cancel running tasks in the failed stage -- cf. SPARK-17064
val noResubmitEnqueued = !failedStages.contains(failedStage)
failedStages += failedStage
failedStages += mapStage
if (noResubmitEnqueued) {
// If the map stage is INDETERMINATE, which means the map tasks may return
// different result when re-try, we need to re-try all the tasks of the failed
// stage and its succeeding stages, because the input data will be changed after the
// map tasks are re-tried.
// Note that, if map stage is UNORDERED, we are fine. The shuffle partitioner is
// guaranteed to be determinate, so the input data of the reducers will not change
// even if the map tasks are re-tried.
if (mapStage.isIndeterminate) {
// It's a little tricky to find all the succeeding stages of `mapStage`, because
// each stage only know its parents not children. Here we traverse the stages from
// the leaf nodes (the result stages of active jobs), and rollback all the stages
// in the stage chains that connect to the `mapStage`. To speed up the stage
// traversing, we collect the stages to rollback first. If a stage needs to
// rollback, all its succeeding stages need to rollback to.
val stagesToRollback = HashSet[Stage](mapStage)
def collectStagesToRollback(stageChain: List[Stage]): Unit = {
if (stagesToRollback.contains(stageChain.head)) {
stageChain.drop(1).foreach(s => stagesToRollback += s)
} else {
stageChain.head.parents.foreach { s =>
collectStagesToRollback(s :: stageChain)
}
}
}
def generateErrorMessage(stage: Stage): String = {
"A shuffle map stage with indeterminate output was failed and retried. " +
s"However, Spark cannot rollback the $stage to re-process the input data, " +
"and has to fail this job. Please eliminate the indeterminacy by " +
"checkpointing the RDD before repartition and try again."
}
activeJobs.foreach(job => collectStagesToRollback(job.finalStage :: Nil))
// The stages will be rolled back after checking
val rollingBackStages = HashSet[Stage](mapStage)
stagesToRollback.foreach {
case mapStage: ShuffleMapStage =>
val numMissingPartitions = mapStage.findMissingPartitions().length
if (numMissingPartitions < mapStage.numTasks) {
if (sc.conf.get(config.SHUFFLE_USE_OLD_FETCH_PROTOCOL)) {
val reason = "A shuffle map stage with indeterminate output was failed " +
"and retried. However, Spark can only do this while using the new " +
"shuffle block fetching protocol. Please check the config " +
"'spark.shuffle.useOldFetchProtocol', see more detail in " +
"SPARK-27665 and SPARK-25341."
abortStage(mapStage, reason, None)
} else {
rollingBackStages += mapStage
}
}
case resultStage: ResultStage if resultStage.activeJob.isDefined =>
val numMissingPartitions = resultStage.findMissingPartitions().length
if (numMissingPartitions < resultStage.numTasks) {
// TODO: support to rollback result tasks.
abortStage(resultStage, generateErrorMessage(resultStage), None)
}
case _ =>
}
logInfo(log"The shuffle map stage ${MDC(SHUFFLE_ID, mapStage)} with indeterminate output was failed, " +
log"we will roll back and rerun below stages which include itself and all its " +
log"indeterminate child stages: ${MDC(STAGES, rollingBackStages)}")
}
// We expect one executor failure to trigger many FetchFailures in rapid succession,
// but all of those task failures can typically be handled by a single resubmission of
// the failed stage. We avoid flooding the scheduler's event queue with resubmit
// messages by checking whether a resubmit is already in the event queue for the
// failed stage. If there is already a resubmit enqueued for a different failed
// stage, that event would also be sufficient to handle the current failed stage, but
// producing a resubmit for each failed stage makes debugging and logging a little
// simpler while not producing an overwhelming number of scheduler events.
logInfo(
log"Resubmitting ${MDC(STAGE, mapStage)} " +
log"(${MDC(STAGE_NAME, mapStage.name)}) and ${MDC(FAILED_STAGE, failedStage)} " +
log"(${MDC(FAILED_STAGE_NAME, failedStage.name)}) due to fetch failure")
messageScheduler.schedule(
new Runnable {
override def run(): Unit = eventProcessLoop.post(ResubmitFailedStages)
},
DAGScheduler.RESUBMIT_TIMEOUT,
TimeUnit.MILLISECONDS
)
}
}
// TODO: mark the executor as failed only if there were lots of fetch failures on it
if (bmAddress != null) {
val externalShuffleServiceEnabled = env.blockManager.externalShuffleServiceEnabled
val isHostDecommissioned = taskScheduler
.getExecutorDecommissionState(bmAddress.executorId)
.exists(_.workerHost.isDefined)
// Shuffle output of all executors on host `bmAddress.host` may be lost if:
// - External shuffle service is enabled, so we assume that all shuffle data on node is
// bad.
// - Host is decommissioned, thus all executors on that host will die.
val shuffleOutputOfEntireHostLost = externalShuffleServiceEnabled ||
isHostDecommissioned
val hostToUnregisterOutputs = if (shuffleOutputOfEntireHostLost
&& unRegisterOutputOnHostOnFetchFailure) {
Some(bmAddress.host)
} else {
// Unregister shuffle data just for one executor (we don't have any
// reason to believe shuffle data has been lost for the entire host).
None
}
removeExecutorAndUnregisterOutputs(
execId = bmAddress.executorId,
fileLost = true,
hostToUnregisterOutputs = hostToUnregisterOutputs,
maybeEpoch = Some(task.epoch),
// shuffleFileLostEpoch is ignored when a host is decommissioned because some
// decommissioned executors on that host might have been removed before this fetch
// failure and might have bumped up the shuffleFileLostEpoch. We ignore that, and
// proceed with unconditional removal of shuffle outputs from all executors on that
// host, including from those that we still haven't confirmed as lost due to heartbeat
// delays.
ignoreShuffleFileLostEpoch = isHostDecommissioned)
}
}
case failure: TaskFailedReason if task.isBarrier =>
// Also handle the task failed reasons here.
failure match {
case Resubmitted =>
handleResubmittedFailure(task, stage)
case _ => // Do nothing.
}
// Always fail the current stage and retry all the tasks when a barrier task fail.
val failedStage = stageIdToStage(task.stageId)
if (failedStage.latestInfo.attemptNumber() != task.stageAttemptId) {
logInfo(log"Ignoring task failure from ${MDC(TASK_NAME, task)} as it's from " +
log"${MDC(FAILED_STAGE, failedStage)} attempt ${MDC(STAGE_ATTEMPT, task.stageAttemptId)} " +
log"and there is a more recent attempt for that stage (attempt " +
log"${MDC(NUM_ATTEMPT, failedStage.latestInfo.attemptNumber())}) running")
} else {
logInfo(log"Marking ${MDC(STAGE_ID, failedStage.id)} (${MDC(STAGE_NAME, failedStage.name)}) " +
log"as failed due to a barrier task failed.")
val message = s"Stage failed because barrier task $task finished unsuccessfully.\n" +
failure.toErrorString
try {
// killAllTaskAttempts will fail if a SchedulerBackend does not implement killTask.
val reason = s"Task $task from barrier stage $failedStage (${failedStage.name}) " +
"failed."
val job = jobIdToActiveJob.get(failedStage.firstJobId)
val shouldInterrupt = job.exists(j => shouldInterruptTaskThread(j))
taskScheduler.killAllTaskAttempts(stageId, shouldInterrupt, reason)
} catch {
case e: UnsupportedOperationException =>
// Cannot continue with barrier stage if failed to cancel zombie barrier tasks.
// TODO SPARK-24877 leave the zombie tasks and ignore their completion events.
logWarning(log"Could not kill all tasks for stage ${MDC(STAGE_ID, stageId)}", e)
abortStage(failedStage, "Could not kill zombie barrier tasks for stage " +
s"$failedStage (${failedStage.name})", Some(e))
}
markStageAsFinished(failedStage, Some(message))
failedStage.failedAttemptIds.add(task.stageAttemptId)
// TODO Refactor the failure handling logic to combine similar code with that of
// FetchFailed.
val shouldAbortStage =
failedStage.failedAttemptIds.size >= maxConsecutiveStageAttempts ||
disallowStageRetryForTest
if (shouldAbortStage) {
val abortMessage = if (disallowStageRetryForTest) {
"Barrier stage will not retry stage due to testing config. Most recent failure " +
s"reason: $message"
} else {
s"$failedStage (${failedStage.name}) has failed the maximum allowable number of " +
s"times: $maxConsecutiveStageAttempts. Most recent failure reason: $message"
}
abortStage(failedStage, abortMessage, None)
} else {
failedStage match {
case failedMapStage: ShuffleMapStage =>
// Mark all the map as broken in the map stage, to ensure retry all the tasks on
// resubmitted stage attempt.
mapOutputTracker.unregisterAllMapAndMergeOutput(failedMapStage.shuffleDep.shuffleId)
case failedResultStage: ResultStage =>
// Abort the failed result stage since we may have committed output for some
// partitions.
val reason = "Could not recover from a failed barrier ResultStage. Most recent " +
s"failure reason: $message"
abortStage(failedResultStage, reason, None)
}
// In case multiple task failures triggered for a single stage attempt, ensure we only
// resubmit the failed stage once.
val noResubmitEnqueued = !failedStages.contains(failedStage)
failedStages += failedStage
if (noResubmitEnqueued) {
logInfo(log"Resubmitting ${MDC(FAILED_STAGE, failedStage)} " +
log"(${MDC(FAILED_STAGE_NAME, failedStage.name)}) due to barrier stage failure.")
messageScheduler.schedule(new Runnable {
override def run(): Unit = eventProcessLoop.post(ResubmitFailedStages)
}, DAGScheduler.RESUBMIT_TIMEOUT, TimeUnit.MILLISECONDS)
}
}
}
case Resubmitted =>
handleResubmittedFailure(task, stage)
case _: TaskCommitDenied =>
// Do nothing here, left up to the TaskScheduler to decide how to handle denied commits
case _: ExceptionFailure | _: TaskKilled =>
// Nothing left to do, already handled above for accumulator updates.
case TaskResultLost =>
// Do nothing here; the TaskScheduler handles these failures and resubmits the task.
case _: ExecutorLostFailure | UnknownReason =>
// Unrecognized failure - also do nothing. If the task fails repeatedly, the TaskScheduler
// will abort the job.
}
}