public TaskState restoreFromEvent()

in tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java [508:601]


  public TaskState restoreFromEvent(HistoryEvent historyEvent) {
    switch (historyEvent.getEventType()) {
      case TASK_STARTED:
      {
        TaskStartedEvent tEvent = (TaskStartedEvent) historyEvent;
        recoveryStartEventSeen = true;
        this.scheduledTime = tEvent.getScheduledTime();
        if (this.attempts == null
            || this.attempts.isEmpty()) {
          this.attempts = new LinkedHashMap<TezTaskAttemptID, TaskAttempt>();
        }
        recoveredState = TaskState.SCHEDULED;
        finishedAttempts = 0;
        return recoveredState;
      }
      case TASK_FINISHED:
      {
        TaskFinishedEvent tEvent = (TaskFinishedEvent) historyEvent;
        if (!recoveryStartEventSeen
            && !tEvent.getState().equals(TaskState.KILLED)) {
          throw new TezUncheckedException("Finished Event seen but"
              + " no Started Event was encountered earlier"
              + ", taskId=" + taskId
              + ", finishState=" + tEvent.getState());
        }
        recoveredState = tEvent.getState();
        if (tEvent.getState() == TaskState.SUCCEEDED
            && tEvent.getSuccessfulAttemptID() != null) {
          successfulAttempt = tEvent.getSuccessfulAttemptID();
        }
        return recoveredState;
      }
      case TASK_ATTEMPT_STARTED:
      {
        TaskAttemptStartedEvent taskAttemptStartedEvent =
            (TaskAttemptStartedEvent) historyEvent;
        TaskAttempt recoveredAttempt = createRecoveredEvent(
            taskAttemptStartedEvent.getTaskAttemptID());
        recoveredAttempt.restoreFromEvent(taskAttemptStartedEvent);
        if (LOG.isDebugEnabled()) {
          LOG.debug("Adding restored attempt into known attempts map"
              + ", taskAttemptId=" + taskAttemptStartedEvent.getTaskAttemptID());
        }
        this.attempts.put(taskAttemptStartedEvent.getTaskAttemptID(),
            recoveredAttempt);
        ++numberUncompletedAttempts;
        this.recoveredState = TaskState.RUNNING;
        return recoveredState;
      }
      case TASK_ATTEMPT_FINISHED:
      {
        TaskAttemptFinishedEvent taskAttemptFinishedEvent =
            (TaskAttemptFinishedEvent) historyEvent;
        TaskAttempt taskAttempt = this.attempts.get(
            taskAttemptFinishedEvent.getTaskAttemptID());
        finishedAttempts++;
        if (taskAttempt == null) {
          LOG.warn("Received an attempt finished event for an attempt that "
              + " never started or does not exist"
              + ", taskAttemptId=" + taskAttemptFinishedEvent.getTaskAttemptID()
              + ", taskAttemptFinishState=" + taskAttemptFinishedEvent.getState());
          TaskAttempt recoveredAttempt = createRecoveredEvent(
              taskAttemptFinishedEvent.getTaskAttemptID());
          this.attempts.put(taskAttemptFinishedEvent.getTaskAttemptID(),
              recoveredAttempt);
          if (!taskAttemptFinishedEvent.getState().equals(TaskAttemptState.KILLED)) {
            throw new TezUncheckedException("Could not find task attempt"
                + " when trying to recover"
                + ", taskAttemptId=" + taskAttemptFinishedEvent.getTaskAttemptID()
                + ", taskAttemptFinishState" + taskAttemptFinishedEvent.getState());
          }
          return recoveredState;
        }
        --numberUncompletedAttempts;
        if (numberUncompletedAttempts < 0) {
          throw new TezUncheckedException("Invalid recovery event for attempt finished"
              + ", more completions than starts encountered"
              + ", taskId=" + taskId
              + ", finishedAttempts=" + finishedAttempts
              + ", incompleteAttempts=" + numberUncompletedAttempts);
        }
        TaskAttemptState taskAttemptState = taskAttempt.restoreFromEvent(
            taskAttemptFinishedEvent);
        if (taskAttemptState.equals(TaskAttemptState.SUCCEEDED)) {
          recoveredState = TaskState.SUCCEEDED;
          successfulAttempt = taskAttempt.getID();
        }
        return recoveredState;
      }
      default:
        throw new RuntimeException("Unexpected event received for restoring"
            + " state, eventType=" + historyEvent.getEventType());
    }
  }