in tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java [508:601]
public TaskState restoreFromEvent(HistoryEvent historyEvent) {
switch (historyEvent.getEventType()) {
case TASK_STARTED:
{
TaskStartedEvent tEvent = (TaskStartedEvent) historyEvent;
recoveryStartEventSeen = true;
this.scheduledTime = tEvent.getScheduledTime();
if (this.attempts == null
|| this.attempts.isEmpty()) {
this.attempts = new LinkedHashMap<TezTaskAttemptID, TaskAttempt>();
}
recoveredState = TaskState.SCHEDULED;
finishedAttempts = 0;
return recoveredState;
}
case TASK_FINISHED:
{
TaskFinishedEvent tEvent = (TaskFinishedEvent) historyEvent;
if (!recoveryStartEventSeen
&& !tEvent.getState().equals(TaskState.KILLED)) {
throw new TezUncheckedException("Finished Event seen but"
+ " no Started Event was encountered earlier"
+ ", taskId=" + taskId
+ ", finishState=" + tEvent.getState());
}
recoveredState = tEvent.getState();
if (tEvent.getState() == TaskState.SUCCEEDED
&& tEvent.getSuccessfulAttemptID() != null) {
successfulAttempt = tEvent.getSuccessfulAttemptID();
}
return recoveredState;
}
case TASK_ATTEMPT_STARTED:
{
TaskAttemptStartedEvent taskAttemptStartedEvent =
(TaskAttemptStartedEvent) historyEvent;
TaskAttempt recoveredAttempt = createRecoveredEvent(
taskAttemptStartedEvent.getTaskAttemptID());
recoveredAttempt.restoreFromEvent(taskAttemptStartedEvent);
if (LOG.isDebugEnabled()) {
LOG.debug("Adding restored attempt into known attempts map"
+ ", taskAttemptId=" + taskAttemptStartedEvent.getTaskAttemptID());
}
this.attempts.put(taskAttemptStartedEvent.getTaskAttemptID(),
recoveredAttempt);
++numberUncompletedAttempts;
this.recoveredState = TaskState.RUNNING;
return recoveredState;
}
case TASK_ATTEMPT_FINISHED:
{
TaskAttemptFinishedEvent taskAttemptFinishedEvent =
(TaskAttemptFinishedEvent) historyEvent;
TaskAttempt taskAttempt = this.attempts.get(
taskAttemptFinishedEvent.getTaskAttemptID());
finishedAttempts++;
if (taskAttempt == null) {
LOG.warn("Received an attempt finished event for an attempt that "
+ " never started or does not exist"
+ ", taskAttemptId=" + taskAttemptFinishedEvent.getTaskAttemptID()
+ ", taskAttemptFinishState=" + taskAttemptFinishedEvent.getState());
TaskAttempt recoveredAttempt = createRecoveredEvent(
taskAttemptFinishedEvent.getTaskAttemptID());
this.attempts.put(taskAttemptFinishedEvent.getTaskAttemptID(),
recoveredAttempt);
if (!taskAttemptFinishedEvent.getState().equals(TaskAttemptState.KILLED)) {
throw new TezUncheckedException("Could not find task attempt"
+ " when trying to recover"
+ ", taskAttemptId=" + taskAttemptFinishedEvent.getTaskAttemptID()
+ ", taskAttemptFinishState" + taskAttemptFinishedEvent.getState());
}
return recoveredState;
}
--numberUncompletedAttempts;
if (numberUncompletedAttempts < 0) {
throw new TezUncheckedException("Invalid recovery event for attempt finished"
+ ", more completions than starts encountered"
+ ", taskId=" + taskId
+ ", finishedAttempts=" + finishedAttempts
+ ", incompleteAttempts=" + numberUncompletedAttempts);
}
TaskAttemptState taskAttemptState = taskAttempt.restoreFromEvent(
taskAttemptFinishedEvent);
if (taskAttemptState.equals(TaskAttemptState.SUCCEEDED)) {
recoveredState = TaskState.SUCCEEDED;
successfulAttempt = taskAttempt.getID();
}
return recoveredState;
}
default:
throw new RuntimeException("Unexpected event received for restoring"
+ " state, eventType=" + historyEvent.getEventType());
}
}