in core/src/main/scala/org/apache/spark/executor/Executor.scala [562:877]
override def run(): Unit = {
// Classloader isolation
val isolatedSession = taskDescription.artifacts.state match {
case Some(jobArtifactState) =>
isolatedSessionCache.get(jobArtifactState.uuid, () => newSessionState(jobArtifactState))
case _ => defaultSessionState
}
setMDCForTask(taskName, mdcProperties)
threadId = Thread.currentThread.getId
Thread.currentThread.setName(threadName)
val threadMXBean = ManagementFactory.getThreadMXBean
val taskMemoryManager = new TaskMemoryManager(env.memoryManager, taskId)
val deserializeStartTimeNs = System.nanoTime()
val deserializeStartCpuTime = if (threadMXBean.isCurrentThreadCpuTimeSupported) {
threadMXBean.getCurrentThreadCpuTime
} else 0L
Thread.currentThread.setContextClassLoader(isolatedSession.replClassLoader)
val ser = env.closureSerializer.newInstance()
logInfo(log"Running ${LogMDC(TASK_NAME, taskName)}")
execBackend.statusUpdate(taskId, TaskState.RUNNING, EMPTY_BYTE_BUFFER)
var taskStartTimeNs: Long = 0
var taskStartCpu: Long = 0
startGCTime = computeTotalGcTime()
var taskStarted: Boolean = false
try {
// Must be set before updateDependencies() is called, in case fetching dependencies
// requires access to properties contained within (e.g. for access control).
Executor.taskDeserializationProps.set(taskDescription.properties)
updateDependencies(
taskDescription.artifacts.files,
taskDescription.artifacts.jars,
taskDescription.artifacts.archives,
isolatedSession)
// Always reset the thread class loader to ensure if any updates, all threads (not only
// the thread that updated the dependencies) can update to the new class loader.
Thread.currentThread.setContextClassLoader(isolatedSession.replClassLoader)
task = ser.deserialize[Task[Any]](
taskDescription.serializedTask, Thread.currentThread.getContextClassLoader)
task.localProperties = taskDescription.properties
task.setTaskMemoryManager(taskMemoryManager)
// If this task has been killed before we deserialized it, let's quit now. Otherwise,
// continue executing the task.
val killReason = reasonIfKilled
if (killReason.isDefined) {
// Throw an exception rather than returning, because returning within a try{} block
// causes a NonLocalReturnControl exception to be thrown. The NonLocalReturnControl
// exception will be caught by the catch block, leading to an incorrect ExceptionFailure
// for the task.
throw new TaskKilledException(killReason.get)
}
// The purpose of updating the epoch here is to invalidate executor map output status cache
// in case FetchFailures have occurred. In local mode `env.mapOutputTracker` will be
// MapOutputTrackerMaster and its cache invalidation is not based on epoch numbers so
// we don't need to make any special calls here.
if (!isLocal) {
logDebug(s"$taskName's epoch is ${task.epoch}")
env.mapOutputTracker.asInstanceOf[MapOutputTrackerWorker].updateEpoch(task.epoch)
}
metricsPoller.onTaskStart(taskId, task.stageId, task.stageAttemptId)
taskStarted = true
// Run the actual task and measure its runtime.
taskStartTimeNs = System.nanoTime()
taskStartCpu = if (threadMXBean.isCurrentThreadCpuTimeSupported) {
threadMXBean.getCurrentThreadCpuTime
} else 0L
var threwException = true
// Convert resources amounts info to ResourceInformation
val resources = taskDescription.resources.map { case (rName, addressesAmounts) =>
rName -> new ResourceInformation(rName, addressesAmounts.keys.toSeq.sorted.toArray)
}
val value = Utils.tryWithSafeFinally {
val res = task.run(
taskAttemptId = taskId,
attemptNumber = taskDescription.attemptNumber,
metricsSystem = env.metricsSystem,
cpus = taskDescription.cpus,
resources = resources,
plugins = plugins)
threwException = false
res
} {
val releasedLocks = env.blockManager.releaseAllLocksForTask(taskId)
val freedMemory = taskMemoryManager.cleanUpAllAllocatedMemory()
if (freedMemory > 0 && !threwException) {
val errMsg = log"Managed memory leak detected; size = " +
log"${LogMDC(NUM_BYTES, freedMemory)} bytes, ${LogMDC(TASK_NAME, taskName)}"
if (conf.get(UNSAFE_EXCEPTION_ON_MEMORY_LEAK)) {
throw SparkException.internalError(errMsg.message, category = "EXECUTOR")
} else {
logWarning(errMsg)
}
}
if (releasedLocks.nonEmpty && !threwException) {
val errMsg =
log"${LogMDC(NUM_RELEASED_LOCKS, releasedLocks.size)} block locks" +
log" were not released by ${LogMDC(TASK_NAME, taskName)}\n" +
log" ${LogMDC(RELEASED_LOCKS, releasedLocks.mkString("[", ", ", "]"))})"
if (conf.get(STORAGE_EXCEPTION_PIN_LEAK)) {
throw SparkException.internalError(errMsg.message, category = "EXECUTOR")
} else {
logInfo(errMsg)
}
}
}
task.context.fetchFailed.foreach { fetchFailure =>
// uh-oh. it appears the user code has caught the fetch-failure without throwing any
// other exceptions. Its *possible* this is what the user meant to do (though highly
// unlikely). So we will log an error and keep going.
logError(log"${LogMDC(TASK_NAME, taskName)} completed successfully though internally " +
log"it encountered unrecoverable fetch failures! Most likely this means user code " +
log"is incorrectly swallowing Spark's internal " +
log"${LogMDC(CLASS_NAME, classOf[FetchFailedException])}", fetchFailure)
}
val taskFinishNs = System.nanoTime()
val taskFinishCpu = if (threadMXBean.isCurrentThreadCpuTimeSupported) {
threadMXBean.getCurrentThreadCpuTime
} else 0L
// If the task has been killed, let's fail it.
task.context.killTaskIfInterrupted()
val resultSer = env.serializer.newInstance()
val beforeSerializationNs = System.nanoTime()
val valueByteBuffer = SerializerHelper.serializeToChunkedBuffer(resultSer, value)
val afterSerializationNs = System.nanoTime()
// Deserialization happens in two parts: first, we deserialize a Task object, which
// includes the Partition. Second, Task.run() deserializes the RDD and function to be run.
task.metrics.setExecutorDeserializeTime(TimeUnit.NANOSECONDS.toMillis(
(taskStartTimeNs - deserializeStartTimeNs) + task.executorDeserializeTimeNs))
task.metrics.setExecutorDeserializeCpuTime(
(taskStartCpu - deserializeStartCpuTime) + task.executorDeserializeCpuTime)
// We need to subtract Task.run()'s deserialization time to avoid double-counting
task.metrics.setExecutorRunTime(TimeUnit.NANOSECONDS.toMillis(
(taskFinishNs - taskStartTimeNs) * taskDescription.cpus
- task.executorDeserializeTimeNs))
task.metrics.setExecutorCpuTime(
(taskFinishCpu - taskStartCpu) - task.executorDeserializeCpuTime)
task.metrics.setJvmGCTime(computeTotalGcTime() - startGCTime)
task.metrics.setResultSerializationTime(TimeUnit.NANOSECONDS.toMillis(
afterSerializationNs - beforeSerializationNs))
task.metrics.setPeakOnHeapExecutionMemory(taskMemoryManager.getPeakOnHeapExecutionMemory)
task.metrics.setPeakOffHeapExecutionMemory(taskMemoryManager.getPeakOffHeapExecutionMemory)
// Expose task metrics using the Dropwizard metrics system.
// Update task metrics counters
executorSource.METRIC_CPU_TIME.inc(task.metrics.executorCpuTime)
executorSource.METRIC_RUN_TIME.inc(task.metrics.executorRunTime)
executorSource.METRIC_JVM_GC_TIME.inc(task.metrics.jvmGCTime)
executorSource.METRIC_DESERIALIZE_TIME.inc(task.metrics.executorDeserializeTime)
executorSource.METRIC_DESERIALIZE_CPU_TIME.inc(task.metrics.executorDeserializeCpuTime)
executorSource.METRIC_RESULT_SERIALIZE_TIME.inc(task.metrics.resultSerializationTime)
executorSource.METRIC_INPUT_BYTES_READ
.inc(task.metrics.inputMetrics.bytesRead)
executorSource.METRIC_INPUT_RECORDS_READ
.inc(task.metrics.inputMetrics.recordsRead)
executorSource.METRIC_OUTPUT_BYTES_WRITTEN
.inc(task.metrics.outputMetrics.bytesWritten)
executorSource.METRIC_OUTPUT_RECORDS_WRITTEN
.inc(task.metrics.outputMetrics.recordsWritten)
executorSource.METRIC_RESULT_SIZE.inc(task.metrics.resultSize)
executorSource.METRIC_DISK_BYTES_SPILLED.inc(task.metrics.diskBytesSpilled)
executorSource.METRIC_MEMORY_BYTES_SPILLED.inc(task.metrics.memoryBytesSpilled)
incrementShuffleMetrics(executorSource, task.metrics)
// Note: accumulator updates must be collected after TaskMetrics is updated
val accumUpdates = task.collectAccumulatorUpdates()
val metricPeaks = metricsPoller.getTaskMetricPeaks(taskId)
// TODO: do not serialize value twice
val directResult = new DirectTaskResult(valueByteBuffer, accumUpdates, metricPeaks)
// try to estimate a reasonable upper bound of DirectTaskResult serialization
val serializedDirectResult = SerializerHelper.serializeToChunkedBuffer(ser, directResult,
valueByteBuffer.size + accumUpdates.size * 32 + metricPeaks.length * 8)
val resultSize = serializedDirectResult.size
// directSend = sending directly back to the driver
val serializedResult: ByteBuffer = {
if (maxResultSize > 0 && resultSize > maxResultSize) {
logWarning(log"Finished ${LogMDC(TASK_NAME, taskName)}. " +
log"Result is larger than maxResultSize " +
log"(${LogMDC(RESULT_SIZE_BYTES, Utils.bytesToString(resultSize))} > " +
log"${LogMDC(RESULT_SIZE_BYTES_MAX, Utils.bytesToString(maxResultSize))}), " +
log"dropping it.")
ser.serialize(new IndirectTaskResult[Any](TaskResultBlockId(taskId), resultSize))
} else if (resultSize > maxDirectResultSize) {
val blockId = TaskResultBlockId(taskId)
env.blockManager.putBytes(
blockId,
serializedDirectResult,
StorageLevel.MEMORY_AND_DISK_SER)
logInfo(log"Finished ${LogMDC(TASK_NAME, taskName)}." +
log" ${LogMDC(NUM_BYTES, resultSize)} bytes result sent via BlockManager)")
ser.serialize(new IndirectTaskResult[Any](blockId, resultSize))
} else {
logInfo(log"Finished ${LogMDC(TASK_NAME, taskName)}." +
log" ${LogMDC(NUM_BYTES, resultSize)} bytes result sent to driver")
// toByteBuffer is safe here, guarded by maxDirectResultSize
serializedDirectResult.toByteBuffer
}
}
executorSource.SUCCEEDED_TASKS.inc(1L)
setTaskFinishedAndClearInterruptStatus()
plugins.foreach(_.onTaskSucceeded())
execBackend.statusUpdate(taskId, TaskState.FINISHED, serializedResult)
} catch {
case t: TaskKilledException =>
logInfo(log"Executor killed ${LogMDC(TASK_NAME, taskName)}," +
log" reason: ${LogMDC(REASON, t.reason)}")
val (accums, accUpdates) = collectAccumulatorsAndResetStatusOnFailure(taskStartTimeNs)
// Here and below, put task metric peaks in an immutable.ArraySeq to expose them as an
// immutable.Seq without requiring a copy.
val metricPeaks = metricsPoller.getTaskMetricPeaks(taskId).toImmutableArraySeq
val reason = TaskKilled(t.reason, accUpdates, accums, metricPeaks)
plugins.foreach(_.onTaskFailed(reason))
execBackend.statusUpdate(taskId, TaskState.KILLED, ser.serialize(reason))
case _: InterruptedException | NonFatal(_) if
task != null && task.reasonIfKilled.isDefined =>
val killReason = task.reasonIfKilled.getOrElse("unknown reason")
logInfo(log"Executor interrupted and killed ${LogMDC(TASK_NAME, taskName)}," +
log" reason: ${LogMDC(REASON, killReason)}")
val (accums, accUpdates) = collectAccumulatorsAndResetStatusOnFailure(taskStartTimeNs)
val metricPeaks = metricsPoller.getTaskMetricPeaks(taskId).toImmutableArraySeq
val reason = TaskKilled(killReason, accUpdates, accums, metricPeaks)
plugins.foreach(_.onTaskFailed(reason))
execBackend.statusUpdate(taskId, TaskState.KILLED, ser.serialize(reason))
case t: Throwable if hasFetchFailure && !Executor.isFatalError(t, killOnFatalErrorDepth) =>
val reason = task.context.fetchFailed.get.toTaskFailedReason
if (!t.isInstanceOf[FetchFailedException]) {
// there was a fetch failure in the task, but some user code wrapped that exception
// and threw something else. Regardless, we treat it as a fetch failure.
logWarning(log"${LogMDC(TASK_NAME, taskName)} encountered a " +
log"${LogMDC(CLASS_NAME, classOf[FetchFailedException].getName)} " +
log"and failed, but the " +
log"${LogMDC(CLASS_NAME, classOf[FetchFailedException].getName)} " +
log"was hidden by another exception. Spark is handling this like a fetch failure " +
log"and ignoring the other exception: ${LogMDC(ERROR, t)}")
}
setTaskFinishedAndClearInterruptStatus()
plugins.foreach(_.onTaskFailed(reason))
execBackend.statusUpdate(taskId, TaskState.FAILED, ser.serialize(reason))
case CausedBy(cDE: CommitDeniedException) =>
val reason = cDE.toTaskCommitDeniedReason
setTaskFinishedAndClearInterruptStatus()
plugins.foreach(_.onTaskFailed(reason))
execBackend.statusUpdate(taskId, TaskState.KILLED, ser.serialize(reason))
case t: Throwable if env.isStopped =>
// Log the expected exception after executor.stop without stack traces
// see: SPARK-19147
logError(log"Exception in ${LogMDC(TASK_NAME, taskName)}: ${LogMDC(ERROR, t.getMessage)}")
case t: Throwable =>
// Attempt to exit cleanly by informing the driver of our failure.
// If anything goes wrong (or this was a fatal exception), we will delegate to
// the default uncaught exception handler, which will terminate the Executor.
logError(log"Exception in ${LogMDC(TASK_NAME, taskName)}", t)
// SPARK-20904: Do not report failure to driver if if happened during shut down. Because
// libraries may set up shutdown hooks that race with running tasks during shutdown,
// spurious failures may occur and can result in improper accounting in the driver (e.g.
// the task failure would not be ignored if the shutdown happened because of preemption,
// instead of an app issue).
if (!ShutdownHookManager.inShutdown()) {
val (accums, accUpdates) = collectAccumulatorsAndResetStatusOnFailure(taskStartTimeNs)
val metricPeaks = metricsPoller.getTaskMetricPeaks(taskId).toImmutableArraySeq
val (taskFailureReason, serializedTaskFailureReason) = {
try {
val ef = new ExceptionFailure(t, accUpdates).withAccums(accums)
.withMetricPeaks(metricPeaks)
(ef, ser.serialize(ef))
} catch {
case _: NotSerializableException =>
// t is not serializable so just send the stacktrace
val ef = new ExceptionFailure(t, accUpdates, false).withAccums(accums)
.withMetricPeaks(metricPeaks)
(ef, ser.serialize(ef))
}
}
setTaskFinishedAndClearInterruptStatus()
plugins.foreach(_.onTaskFailed(taskFailureReason))
execBackend.statusUpdate(taskId, TaskState.FAILED, serializedTaskFailureReason)
} else {
logInfo("Not reporting error to driver during JVM shutdown.")
}
// Don't forcibly exit unless the exception was inherently fatal, to avoid
// stopping other tasks unnecessarily.
if (Executor.isFatalError(t, killOnFatalErrorDepth)) {
uncaughtExceptionHandler.uncaughtException(Thread.currentThread(), t)
}
} finally {
cleanMDCForTask(taskName, mdcProperties)
runningTasks.remove(taskId)
if (taskStarted) {
// This means the task was successfully deserialized, its stageId and stageAttemptId
// are known, and metricsPoller.onTaskStart was called.
metricsPoller.onTaskCompletion(taskId, task.stageId, task.stageAttemptId)
}
}
}