public TaskRunner2Result run()

in tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezTaskRunner2.java [176:297]


  public TaskRunner2Result run() {
    try {
      Future<TaskRunner2CallableResult> future = null;
      synchronized (this) {
        // All running state changes must be made within a synchronized block to ensure
        // kills are issued or the task is not setup.
        if (isRunningState()) {
          // Safe to do this within a synchronized block because we're providing
          // the handler on which the Reporter will communicate back. Assuming
          // the register call doesn't end up hanging.
          taskRunnerCallable = new TaskRunner2Callable(task, ugi,
              umbilicalAndErrorHandler);
          taskReporter.registerTask(task, umbilicalAndErrorHandler);
          future = executor.submit(taskRunnerCallable);
        }
      }

      if (future == null) {
        return logAndReturnEndResult(firstEndReason, firstTaskFailureType, firstException, stopContainerRequested.get());
      }

      TaskRunner2CallableResult executionResult = null;
      // The task started. Wait for it to complete.
      try {
        executionResult = future.get();
      } catch (Throwable e) {
        if (e instanceof ExecutionException) {
          e = e.getCause();
        }
        synchronized (this) {
          if (isRunningState()) {
            trySettingEndReason(EndReason.TASK_ERROR);
            registerFirstException(getTaskFailureType(e), e, null);
            LOG.warn("Exception from RunnerCallable", e);
          }
        }
      }
      processCallableResult(executionResult);

      switch (firstEndReason) {
        case SUCCESS:
          try {
            taskReporter.taskSucceeded(task.getTaskAttemptID());
            return logAndReturnEndResult(EndReason.SUCCESS, null, null, stopContainerRequested.get());
          } catch (IOException e) {
            // Comm failure. Task can't do much.
            handleFinalStatusUpdateFailure(e, "success");
            return logAndReturnEndResult(EndReason.COMMUNICATION_FAILURE, firstTaskFailureType, e, stopContainerRequested.get());
          } catch (TezException e) {
            // Failure from AM. Task can't do much.
            handleFinalStatusUpdateFailure(e, "success");
            return logAndReturnEndResult(EndReason.COMMUNICATION_FAILURE, firstTaskFailureType, e, stopContainerRequested.get());
          }
        case CONTAINER_STOP_REQUESTED:
          // Don't need to send any more communication updates to the AM.
          return logAndReturnEndResult(firstEndReason, firstTaskFailureType, null, stopContainerRequested.get());
        case KILL_REQUESTED:
          // This was an external kill called directly on the task runner
          return logAndReturnEndResult(firstEndReason, firstTaskFailureType, null, stopContainerRequested.get());
        case TASK_KILL_REQUEST:
          // Task reported a self kill
          return logAndReturnEndResult(firstEndReason, firstTaskFailureType, firstException, stopContainerRequested.get());
        case COMMUNICATION_FAILURE:
          // Already seen a communication failure. There's no point trying to report another one.
          return logAndReturnEndResult(firstEndReason, firstTaskFailureType, firstException, stopContainerRequested.get());
        case TASK_ERROR:
          // Don't report an error again if it was reported via signalFatalError
          if (errorReporterToAm.get()) {
            return logAndReturnEndResult(firstEndReason, firstTaskFailureType, firstException, stopContainerRequested.get());
          } else {
            String message;
            if (firstException instanceof FSError) {
              message = "Encountered an FSError while executing task: " + task.getTaskAttemptID();
            } else if (firstException instanceof Error) {
              message = "Encountered an Error while executing task: " + task.getTaskAttemptID();
            } else {
              message = "Error while running task ( failure ) : " + task.getTaskAttemptID();
            }
            try {
              taskReporter.taskFailed(task.getTaskAttemptID(), firstTaskFailureType, firstException, message, exceptionSourceInfo);
              return logAndReturnEndResult(firstEndReason, firstTaskFailureType, firstException, stopContainerRequested.get());
            } catch (IOException e) {
              // Comm failure. Task can't do much.
              handleFinalStatusUpdateFailure(e, "failure");
              return logAndReturnEndResult(firstEndReason, firstTaskFailureType, firstException, stopContainerRequested.get());
            } catch (TezException e) {
              // Failure from AM. Task can't do much.
              handleFinalStatusUpdateFailure(e, "failure");
              return logAndReturnEndResult(firstEndReason, firstTaskFailureType, firstException, stopContainerRequested.get());
            }
          }
        default:
          LOG.error("Unexpected EndReason. File a bug");
          return logAndReturnEndResult(EndReason.TASK_ERROR, firstTaskFailureType, new RuntimeException("Unexpected EndReason"), stopContainerRequested.get());

      }
    } finally {
      // Clear the interrupted status of the blocking thread, in case it is set after the
      // InterruptedException was invoked.
      oobSignalLock.lock();
      try {
        while (oobSignalErrorInProgress) {
          try {
            oobSignalCondition.await();
          } catch (InterruptedException e) {
            LOG.warn("Interrupted while waiting for OOB fatal error to complete");
            Thread.currentThread().interrupt();
          }
        }
      } finally {
        oobSignalLock.unlock();
      }
      taskReporter.unregisterTask(task.getTaskAttemptID());
      if (taskKillStartTime != 0) {
        LOG.info("Time taken to interrupt task={}", (System.currentTimeMillis() - taskKillStartTime));
      }
      if (localExecutor != null) {
        localExecutor.shutdown();
      }
      Thread.interrupted();
    }
  }