in tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java [1102:1227]
public TaskStateInternal transition(TaskImpl task, TaskEvent taskEvent) {
if (taskEvent instanceof TaskEventRecoverTask) {
TaskEventRecoverTask taskEventRecoverTask =
(TaskEventRecoverTask) taskEvent;
if (taskEventRecoverTask.getDesiredState() != null
&& !taskEventRecoverTask.recoverData()) {
// TODO recover attempts if desired state is given?
// History may not have all data.
switch (taskEventRecoverTask.getDesiredState()) {
case SUCCEEDED:
return TaskStateInternal.SUCCEEDED;
case FAILED:
return TaskStateInternal.FAILED;
case KILLED:
return TaskStateInternal.KILLED;
}
}
}
TaskStateInternal endState = TaskStateInternal.NEW;
if (task.attempts != null) {
for (TaskAttempt taskAttempt : task.attempts.values()) {
task.eventHandler.handle(new TaskAttemptEvent(
taskAttempt.getID(), TaskAttemptEventType.TA_RECOVER));
}
}
LOG.info("Trying to recover task"
+ ", taskId=" + task.getTaskId()
+ ", recoveredState=" + task.recoveredState);
switch(task.recoveredState) {
case NEW:
// Nothing to do until the vertex schedules this task
endState = TaskStateInternal.NEW;
break;
case SCHEDULED:
case RUNNING:
case SUCCEEDED:
if (task.successfulAttempt != null) {
//Found successful attempt
//Recover data
boolean recoveredData = true;
if (task.getVertex().getOutputCommitters() != null
&& !task.getVertex().getOutputCommitters().isEmpty()) {
for (Entry<String, OutputCommitter> entry
: task.getVertex().getOutputCommitters().entrySet()) {
LOG.info("Recovering data for task from previous DAG attempt"
+ ", taskId=" + task.getTaskId()
+ ", output=" + entry.getKey());
OutputCommitter committer = entry.getValue();
if (!committer.isTaskRecoverySupported()) {
LOG.info("Task recovery not supported by committer"
+ ", failing task attempt"
+ ", taskId=" + task.getTaskId()
+ ", attemptId=" + task.successfulAttempt
+ ", output=" + entry.getKey());
recoveredData = false;
break;
}
try {
committer.recoverTask(task.getTaskId().getId(),
task.appContext.getApplicationAttemptId().getAttemptId()-1);
} catch (Exception e) {
LOG.warn("Task recovery failed by committer"
+ ", taskId=" + task.getTaskId()
+ ", attemptId=" + task.successfulAttempt
+ ", output=" + entry.getKey(), e);
recoveredData = false;
break;
}
}
}
if (!recoveredData) {
task.successfulAttempt = null;
} else {
LOG.info("Recovered a successful attempt"
+ ", taskAttemptId=" + task.successfulAttempt.toString());
task.logJobHistoryTaskFinishedEvent();
task.eventHandler.handle(
new VertexEventTaskCompleted(task.taskId,
getExternalState(TaskStateInternal.SUCCEEDED)));
task.eventHandler.handle(
new VertexEventTaskAttemptCompleted(
task.successfulAttempt, TaskAttemptStateInternal.SUCCEEDED));
endState = TaskStateInternal.SUCCEEDED;
break;
}
}
if (endState != TaskStateInternal.SUCCEEDED &&
task.attempts.size() >= task.maxFailedAttempts) {
// Exceeded max attempts
task.finished(TaskStateInternal.FAILED);
endState = TaskStateInternal.FAILED;
break;
}
// no successful attempt and all attempts completed
// schedule a new one
// If any incomplete, the running attempt will moved to failed and its
// update will trigger a new attempt if possible
if (task.attempts.size() == task.finishedAttempts) {
task.addAndScheduleAttempt();
}
endState = TaskStateInternal.RUNNING;
break;
case KILLED:
// Nothing to do
// Inform vertex
task.eventHandler.handle(
new VertexEventTaskCompleted(task.taskId,
getExternalState(TaskStateInternal.KILLED)));
endState = TaskStateInternal.KILLED;
break;
case FAILED:
// Nothing to do
// Inform vertex
task.eventHandler.handle(
new VertexEventTaskCompleted(task.taskId,
getExternalState(TaskStateInternal.FAILED)));
endState = TaskStateInternal.FAILED;
break;
}
return endState;
}