public TaskStateInternal transition()

in tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java [1102:1227]


    public TaskStateInternal transition(TaskImpl task, TaskEvent taskEvent) {
      if (taskEvent instanceof TaskEventRecoverTask) {
        TaskEventRecoverTask taskEventRecoverTask =
            (TaskEventRecoverTask) taskEvent;
        if (taskEventRecoverTask.getDesiredState() != null
            && !taskEventRecoverTask.recoverData()) {
          // TODO recover attempts if desired state is given?
          // History may not have all data.
          switch (taskEventRecoverTask.getDesiredState()) {
            case SUCCEEDED:
              return TaskStateInternal.SUCCEEDED;
            case FAILED:
              return TaskStateInternal.FAILED;
            case KILLED:
              return TaskStateInternal.KILLED;
          }
        }
      }

      TaskStateInternal endState = TaskStateInternal.NEW;
      if (task.attempts != null) {
        for (TaskAttempt taskAttempt : task.attempts.values()) {
          task.eventHandler.handle(new TaskAttemptEvent(
              taskAttempt.getID(), TaskAttemptEventType.TA_RECOVER));
        }
      }
      LOG.info("Trying to recover task"
          + ", taskId=" + task.getTaskId()
          + ", recoveredState=" + task.recoveredState);
      switch(task.recoveredState) {
        case NEW:
          // Nothing to do until the vertex schedules this task
          endState = TaskStateInternal.NEW;
          break;
        case SCHEDULED:
        case RUNNING:
        case SUCCEEDED:
          if (task.successfulAttempt != null) {
            //Found successful attempt
            //Recover data
            boolean recoveredData = true;
            if (task.getVertex().getOutputCommitters() != null
                && !task.getVertex().getOutputCommitters().isEmpty()) {
              for (Entry<String, OutputCommitter> entry
                  : task.getVertex().getOutputCommitters().entrySet()) {
                LOG.info("Recovering data for task from previous DAG attempt"
                    + ", taskId=" + task.getTaskId()
                    + ", output=" + entry.getKey());
                OutputCommitter committer = entry.getValue();
                if (!committer.isTaskRecoverySupported()) {
                  LOG.info("Task recovery not supported by committer"
                      + ", failing task attempt"
                      + ", taskId=" + task.getTaskId()
                      + ", attemptId=" + task.successfulAttempt
                      + ", output=" + entry.getKey());
                  recoveredData = false;
                  break;
                }
                try {
                  committer.recoverTask(task.getTaskId().getId(),
                      task.appContext.getApplicationAttemptId().getAttemptId()-1);
                } catch (Exception e) {
                  LOG.warn("Task recovery failed by committer"
                      + ", taskId=" + task.getTaskId()
                      + ", attemptId=" + task.successfulAttempt
                      + ", output=" + entry.getKey(), e);
                  recoveredData = false;
                  break;
                }
              }
            }
            if (!recoveredData) {
              task.successfulAttempt = null;
            } else {
              LOG.info("Recovered a successful attempt"
                  + ", taskAttemptId=" + task.successfulAttempt.toString());
              task.logJobHistoryTaskFinishedEvent();
              task.eventHandler.handle(
                  new VertexEventTaskCompleted(task.taskId,
                      getExternalState(TaskStateInternal.SUCCEEDED)));
              task.eventHandler.handle(
                  new VertexEventTaskAttemptCompleted(
                      task.successfulAttempt, TaskAttemptStateInternal.SUCCEEDED));
              endState = TaskStateInternal.SUCCEEDED;
              break;
            }
          }

          if (endState != TaskStateInternal.SUCCEEDED &&
              task.attempts.size() >= task.maxFailedAttempts) {
            // Exceeded max attempts
            task.finished(TaskStateInternal.FAILED);
            endState = TaskStateInternal.FAILED;
            break;
          }

          // no successful attempt and all attempts completed
          // schedule a new one
          // If any incomplete, the running attempt will moved to failed and its
          // update will trigger a new attempt if possible
          if (task.attempts.size() == task.finishedAttempts) {
            task.addAndScheduleAttempt();
          }
          endState = TaskStateInternal.RUNNING;
          break;
        case KILLED:
          // Nothing to do
          // Inform vertex
          task.eventHandler.handle(
              new VertexEventTaskCompleted(task.taskId,
                  getExternalState(TaskStateInternal.KILLED)));
          endState  = TaskStateInternal.KILLED;
          break;
        case FAILED:
          // Nothing to do
          // Inform vertex
          task.eventHandler.handle(
              new VertexEventTaskCompleted(task.taskId,
                  getExternalState(TaskStateInternal.FAILED)));

          endState = TaskStateInternal.FAILED;
          break;
      }

      return endState;
    }