override def run()

in core/src/main/scala/org/apache/spark/executor/Executor.scala [562:877]


    override def run(): Unit = {

      // Classloader isolation
      val isolatedSession = taskDescription.artifacts.state match {
        case Some(jobArtifactState) =>
          isolatedSessionCache.get(jobArtifactState.uuid, () => newSessionState(jobArtifactState))
        case _ => defaultSessionState
      }

      setMDCForTask(taskName, mdcProperties)
      threadId = Thread.currentThread.getId
      Thread.currentThread.setName(threadName)
      val threadMXBean = ManagementFactory.getThreadMXBean
      val taskMemoryManager = new TaskMemoryManager(env.memoryManager, taskId)
      val deserializeStartTimeNs = System.nanoTime()
      val deserializeStartCpuTime = if (threadMXBean.isCurrentThreadCpuTimeSupported) {
        threadMXBean.getCurrentThreadCpuTime
      } else 0L
      Thread.currentThread.setContextClassLoader(isolatedSession.replClassLoader)
      val ser = env.closureSerializer.newInstance()
      logInfo(log"Running ${LogMDC(TASK_NAME, taskName)}")
      execBackend.statusUpdate(taskId, TaskState.RUNNING, EMPTY_BYTE_BUFFER)
      var taskStartTimeNs: Long = 0
      var taskStartCpu: Long = 0
      startGCTime = computeTotalGcTime()
      var taskStarted: Boolean = false

      try {
        // Must be set before updateDependencies() is called, in case fetching dependencies
        // requires access to properties contained within (e.g. for access control).
        Executor.taskDeserializationProps.set(taskDescription.properties)

        updateDependencies(
          taskDescription.artifacts.files,
          taskDescription.artifacts.jars,
          taskDescription.artifacts.archives,
          isolatedSession)
        // Always reset the thread class loader to ensure if any updates, all threads (not only
        // the thread that updated the dependencies) can update to the new class loader.
        Thread.currentThread.setContextClassLoader(isolatedSession.replClassLoader)
        task = ser.deserialize[Task[Any]](
          taskDescription.serializedTask, Thread.currentThread.getContextClassLoader)
        task.localProperties = taskDescription.properties
        task.setTaskMemoryManager(taskMemoryManager)

        // If this task has been killed before we deserialized it, let's quit now. Otherwise,
        // continue executing the task.
        val killReason = reasonIfKilled
        if (killReason.isDefined) {
          // Throw an exception rather than returning, because returning within a try{} block
          // causes a NonLocalReturnControl exception to be thrown. The NonLocalReturnControl
          // exception will be caught by the catch block, leading to an incorrect ExceptionFailure
          // for the task.
          throw new TaskKilledException(killReason.get)
        }

        // The purpose of updating the epoch here is to invalidate executor map output status cache
        // in case FetchFailures have occurred. In local mode `env.mapOutputTracker` will be
        // MapOutputTrackerMaster and its cache invalidation is not based on epoch numbers so
        // we don't need to make any special calls here.
        if (!isLocal) {
          logDebug(s"$taskName's epoch is ${task.epoch}")
          env.mapOutputTracker.asInstanceOf[MapOutputTrackerWorker].updateEpoch(task.epoch)
        }

        metricsPoller.onTaskStart(taskId, task.stageId, task.stageAttemptId)
        taskStarted = true

        // Run the actual task and measure its runtime.
        taskStartTimeNs = System.nanoTime()
        taskStartCpu = if (threadMXBean.isCurrentThreadCpuTimeSupported) {
          threadMXBean.getCurrentThreadCpuTime
        } else 0L
        var threwException = true
        // Convert resources amounts info to ResourceInformation
        val resources = taskDescription.resources.map { case (rName, addressesAmounts) =>
          rName -> new ResourceInformation(rName, addressesAmounts.keys.toSeq.sorted.toArray)
        }
        val value = Utils.tryWithSafeFinally {
          val res = task.run(
            taskAttemptId = taskId,
            attemptNumber = taskDescription.attemptNumber,
            metricsSystem = env.metricsSystem,
            cpus = taskDescription.cpus,
            resources = resources,
            plugins = plugins)
          threwException = false
          res
        } {
          val releasedLocks = env.blockManager.releaseAllLocksForTask(taskId)
          val freedMemory = taskMemoryManager.cleanUpAllAllocatedMemory()

          if (freedMemory > 0 && !threwException) {
            val errMsg = log"Managed memory leak detected; size = " +
              log"${LogMDC(NUM_BYTES, freedMemory)} bytes, ${LogMDC(TASK_NAME, taskName)}"
            if (conf.get(UNSAFE_EXCEPTION_ON_MEMORY_LEAK)) {
              throw SparkException.internalError(errMsg.message, category = "EXECUTOR")
            } else {
              logWarning(errMsg)
            }
          }

          if (releasedLocks.nonEmpty && !threwException) {
            val errMsg =
              log"${LogMDC(NUM_RELEASED_LOCKS, releasedLocks.size)} block locks" +
                log" were not released by ${LogMDC(TASK_NAME, taskName)}\n" +
                log" ${LogMDC(RELEASED_LOCKS, releasedLocks.mkString("[", ", ", "]"))})"
            if (conf.get(STORAGE_EXCEPTION_PIN_LEAK)) {
              throw SparkException.internalError(errMsg.message, category = "EXECUTOR")
            } else {
              logInfo(errMsg)
            }
          }
        }
        task.context.fetchFailed.foreach { fetchFailure =>
          // uh-oh.  it appears the user code has caught the fetch-failure without throwing any
          // other exceptions.  Its *possible* this is what the user meant to do (though highly
          // unlikely).  So we will log an error and keep going.
          logError(log"${LogMDC(TASK_NAME, taskName)} completed successfully though internally " +
            log"it encountered unrecoverable fetch failures! Most likely this means user code " +
            log"is incorrectly swallowing Spark's internal " +
            log"${LogMDC(CLASS_NAME, classOf[FetchFailedException])}", fetchFailure)
        }
        val taskFinishNs = System.nanoTime()
        val taskFinishCpu = if (threadMXBean.isCurrentThreadCpuTimeSupported) {
          threadMXBean.getCurrentThreadCpuTime
        } else 0L

        // If the task has been killed, let's fail it.
        task.context.killTaskIfInterrupted()

        val resultSer = env.serializer.newInstance()
        val beforeSerializationNs = System.nanoTime()
        val valueByteBuffer = SerializerHelper.serializeToChunkedBuffer(resultSer, value)
        val afterSerializationNs = System.nanoTime()

        // Deserialization happens in two parts: first, we deserialize a Task object, which
        // includes the Partition. Second, Task.run() deserializes the RDD and function to be run.
        task.metrics.setExecutorDeserializeTime(TimeUnit.NANOSECONDS.toMillis(
          (taskStartTimeNs - deserializeStartTimeNs) + task.executorDeserializeTimeNs))
        task.metrics.setExecutorDeserializeCpuTime(
          (taskStartCpu - deserializeStartCpuTime) + task.executorDeserializeCpuTime)
        // We need to subtract Task.run()'s deserialization time to avoid double-counting
        task.metrics.setExecutorRunTime(TimeUnit.NANOSECONDS.toMillis(
          (taskFinishNs - taskStartTimeNs) * taskDescription.cpus
            - task.executorDeserializeTimeNs))
        task.metrics.setExecutorCpuTime(
          (taskFinishCpu - taskStartCpu) - task.executorDeserializeCpuTime)
        task.metrics.setJvmGCTime(computeTotalGcTime() - startGCTime)
        task.metrics.setResultSerializationTime(TimeUnit.NANOSECONDS.toMillis(
          afterSerializationNs - beforeSerializationNs))
        task.metrics.setPeakOnHeapExecutionMemory(taskMemoryManager.getPeakOnHeapExecutionMemory)
        task.metrics.setPeakOffHeapExecutionMemory(taskMemoryManager.getPeakOffHeapExecutionMemory)
        // Expose task metrics using the Dropwizard metrics system.
        // Update task metrics counters
        executorSource.METRIC_CPU_TIME.inc(task.metrics.executorCpuTime)
        executorSource.METRIC_RUN_TIME.inc(task.metrics.executorRunTime)
        executorSource.METRIC_JVM_GC_TIME.inc(task.metrics.jvmGCTime)
        executorSource.METRIC_DESERIALIZE_TIME.inc(task.metrics.executorDeserializeTime)
        executorSource.METRIC_DESERIALIZE_CPU_TIME.inc(task.metrics.executorDeserializeCpuTime)
        executorSource.METRIC_RESULT_SERIALIZE_TIME.inc(task.metrics.resultSerializationTime)
        executorSource.METRIC_INPUT_BYTES_READ
          .inc(task.metrics.inputMetrics.bytesRead)
        executorSource.METRIC_INPUT_RECORDS_READ
          .inc(task.metrics.inputMetrics.recordsRead)
        executorSource.METRIC_OUTPUT_BYTES_WRITTEN
          .inc(task.metrics.outputMetrics.bytesWritten)
        executorSource.METRIC_OUTPUT_RECORDS_WRITTEN
          .inc(task.metrics.outputMetrics.recordsWritten)
        executorSource.METRIC_RESULT_SIZE.inc(task.metrics.resultSize)
        executorSource.METRIC_DISK_BYTES_SPILLED.inc(task.metrics.diskBytesSpilled)
        executorSource.METRIC_MEMORY_BYTES_SPILLED.inc(task.metrics.memoryBytesSpilled)
        incrementShuffleMetrics(executorSource, task.metrics)

        // Note: accumulator updates must be collected after TaskMetrics is updated
        val accumUpdates = task.collectAccumulatorUpdates()
        val metricPeaks = metricsPoller.getTaskMetricPeaks(taskId)
        // TODO: do not serialize value twice
        val directResult = new DirectTaskResult(valueByteBuffer, accumUpdates, metricPeaks)
        // try to estimate a reasonable upper bound of DirectTaskResult serialization
        val serializedDirectResult = SerializerHelper.serializeToChunkedBuffer(ser, directResult,
          valueByteBuffer.size + accumUpdates.size * 32 + metricPeaks.length * 8)
        val resultSize = serializedDirectResult.size

        // directSend = sending directly back to the driver
        val serializedResult: ByteBuffer = {
          if (maxResultSize > 0 && resultSize > maxResultSize) {
            logWarning(log"Finished ${LogMDC(TASK_NAME, taskName)}. " +
              log"Result is larger than maxResultSize " +
              log"(${LogMDC(RESULT_SIZE_BYTES, Utils.bytesToString(resultSize))} > " +
              log"${LogMDC(RESULT_SIZE_BYTES_MAX, Utils.bytesToString(maxResultSize))}), " +
              log"dropping it.")
            ser.serialize(new IndirectTaskResult[Any](TaskResultBlockId(taskId), resultSize))
          } else if (resultSize > maxDirectResultSize) {
            val blockId = TaskResultBlockId(taskId)
            env.blockManager.putBytes(
              blockId,
              serializedDirectResult,
              StorageLevel.MEMORY_AND_DISK_SER)
            logInfo(log"Finished ${LogMDC(TASK_NAME, taskName)}." +
              log" ${LogMDC(NUM_BYTES, resultSize)} bytes result sent via BlockManager)")
            ser.serialize(new IndirectTaskResult[Any](blockId, resultSize))
          } else {
            logInfo(log"Finished ${LogMDC(TASK_NAME, taskName)}." +
              log" ${LogMDC(NUM_BYTES, resultSize)} bytes result sent to driver")
            // toByteBuffer is safe here, guarded by maxDirectResultSize
            serializedDirectResult.toByteBuffer
          }
        }

        executorSource.SUCCEEDED_TASKS.inc(1L)
        setTaskFinishedAndClearInterruptStatus()
        plugins.foreach(_.onTaskSucceeded())
        execBackend.statusUpdate(taskId, TaskState.FINISHED, serializedResult)
      } catch {
        case t: TaskKilledException =>
          logInfo(log"Executor killed ${LogMDC(TASK_NAME, taskName)}," +
            log" reason: ${LogMDC(REASON, t.reason)}")

          val (accums, accUpdates) = collectAccumulatorsAndResetStatusOnFailure(taskStartTimeNs)
          // Here and below, put task metric peaks in an immutable.ArraySeq to expose them as an
          // immutable.Seq without requiring a copy.
          val metricPeaks = metricsPoller.getTaskMetricPeaks(taskId).toImmutableArraySeq
          val reason = TaskKilled(t.reason, accUpdates, accums, metricPeaks)
          plugins.foreach(_.onTaskFailed(reason))
          execBackend.statusUpdate(taskId, TaskState.KILLED, ser.serialize(reason))

        case _: InterruptedException | NonFatal(_) if
            task != null && task.reasonIfKilled.isDefined =>
          val killReason = task.reasonIfKilled.getOrElse("unknown reason")
          logInfo(log"Executor interrupted and killed ${LogMDC(TASK_NAME, taskName)}," +
            log" reason: ${LogMDC(REASON, killReason)}")

          val (accums, accUpdates) = collectAccumulatorsAndResetStatusOnFailure(taskStartTimeNs)
          val metricPeaks = metricsPoller.getTaskMetricPeaks(taskId).toImmutableArraySeq
          val reason = TaskKilled(killReason, accUpdates, accums, metricPeaks)
          plugins.foreach(_.onTaskFailed(reason))
          execBackend.statusUpdate(taskId, TaskState.KILLED, ser.serialize(reason))

        case t: Throwable if hasFetchFailure && !Executor.isFatalError(t, killOnFatalErrorDepth) =>
          val reason = task.context.fetchFailed.get.toTaskFailedReason
          if (!t.isInstanceOf[FetchFailedException]) {
            // there was a fetch failure in the task, but some user code wrapped that exception
            // and threw something else.  Regardless, we treat it as a fetch failure.
            logWarning(log"${LogMDC(TASK_NAME, taskName)} encountered a " +
              log"${LogMDC(CLASS_NAME, classOf[FetchFailedException].getName)} " +
              log"and failed, but the " +
              log"${LogMDC(CLASS_NAME, classOf[FetchFailedException].getName)} " +
              log"was hidden by another exception. Spark is handling this like a fetch failure " +
              log"and ignoring the other exception: ${LogMDC(ERROR, t)}")
          }
          setTaskFinishedAndClearInterruptStatus()
          plugins.foreach(_.onTaskFailed(reason))
          execBackend.statusUpdate(taskId, TaskState.FAILED, ser.serialize(reason))

        case CausedBy(cDE: CommitDeniedException) =>
          val reason = cDE.toTaskCommitDeniedReason
          setTaskFinishedAndClearInterruptStatus()
          plugins.foreach(_.onTaskFailed(reason))
          execBackend.statusUpdate(taskId, TaskState.KILLED, ser.serialize(reason))

        case t: Throwable if env.isStopped =>
          // Log the expected exception after executor.stop without stack traces
          // see: SPARK-19147
          logError(log"Exception in ${LogMDC(TASK_NAME, taskName)}: ${LogMDC(ERROR, t.getMessage)}")

        case t: Throwable =>
          // Attempt to exit cleanly by informing the driver of our failure.
          // If anything goes wrong (or this was a fatal exception), we will delegate to
          // the default uncaught exception handler, which will terminate the Executor.
          logError(log"Exception in ${LogMDC(TASK_NAME, taskName)}", t)

          // SPARK-20904: Do not report failure to driver if if happened during shut down. Because
          // libraries may set up shutdown hooks that race with running tasks during shutdown,
          // spurious failures may occur and can result in improper accounting in the driver (e.g.
          // the task failure would not be ignored if the shutdown happened because of preemption,
          // instead of an app issue).
          if (!ShutdownHookManager.inShutdown()) {
            val (accums, accUpdates) = collectAccumulatorsAndResetStatusOnFailure(taskStartTimeNs)
            val metricPeaks = metricsPoller.getTaskMetricPeaks(taskId).toImmutableArraySeq

            val (taskFailureReason, serializedTaskFailureReason) = {
              try {
                val ef = new ExceptionFailure(t, accUpdates).withAccums(accums)
                  .withMetricPeaks(metricPeaks)
                (ef, ser.serialize(ef))
              } catch {
                case _: NotSerializableException =>
                  // t is not serializable so just send the stacktrace
                  val ef = new ExceptionFailure(t, accUpdates, false).withAccums(accums)
                    .withMetricPeaks(metricPeaks)
                  (ef, ser.serialize(ef))
              }
            }
            setTaskFinishedAndClearInterruptStatus()
            plugins.foreach(_.onTaskFailed(taskFailureReason))
            execBackend.statusUpdate(taskId, TaskState.FAILED, serializedTaskFailureReason)
          } else {
            logInfo("Not reporting error to driver during JVM shutdown.")
          }

          // Don't forcibly exit unless the exception was inherently fatal, to avoid
          // stopping other tasks unnecessarily.
          if (Executor.isFatalError(t, killOnFatalErrorDepth)) {
            uncaughtExceptionHandler.uncaughtException(Thread.currentThread(), t)
          }
      } finally {
        cleanMDCForTask(taskName, mdcProperties)
        runningTasks.remove(taskId)
        if (taskStarted) {
          // This means the task was successfully deserialized, its stageId and stageAttemptId
          // are known, and metricsPoller.onTaskStart was called.
          metricsPoller.onTaskCompletion(taskId, task.stageId, task.stageAttemptId)
        }
      }
    }