in tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java [1297:1423]
public DAGState transition(DAGImpl dag, DAGEvent dagEvent) {
DAGEventRecoverEvent recoverEvent = (DAGEventRecoverEvent) dagEvent;
if (recoverEvent.hasDesiredState()) {
// DAG completed or final end state known
dag.recoveredState = recoverEvent.getDesiredState();
}
if (recoverEvent.getAdditionalUrlsForClasspath() != null) {
LOG.info("Added additional resources : [" + recoverEvent.getAdditionalUrlsForClasspath()
+ "] to classpath");
RelocalizationUtils.addUrlsToClassPath(recoverEvent.getAdditionalUrlsForClasspath());
}
switch (dag.recoveredState) {
case NEW:
// send DAG an Init and start events
dag.eventHandler.handle(new DAGEvent(dag.getID(), DAGEventType.DAG_INIT));
dag.eventHandler.handle(new DAGEventStartDag(dag.getID(), null));
return DAGState.NEW;
case INITED:
// DAG inited but not started
// This implies vertices need to be sent init event
// Root vertices need to be sent start event
// The vertices may already have been sent these events but the
// DAG start may not have been persisted
for (Vertex v : dag.vertices.values()) {
if (v.getInputVerticesCount() == 0) {
if (LOG.isDebugEnabled()) {
LOG.debug("Sending Running Recovery event to root vertex "
+ v.getName());
}
dag.eventHandler.handle(new VertexEventRecoverVertex(v.getVertexId(),
VertexState.RUNNING));
}
}
return DAGState.RUNNING;
case RUNNING:
// if commit is in progress, DAG should fail as commits are not
// recoverable
boolean groupCommitInProgress = false;
if (!dag.recoveredGroupCommits.isEmpty()) {
for (Entry<String, Boolean> entry : dag.recoveredGroupCommits.entrySet()) {
if (!entry.getValue().booleanValue()) {
LOG.info("Found a pending Vertex Group commit"
+ ", vertexGroup=" + entry.getKey());
}
groupCommitInProgress = true;
}
}
if (groupCommitInProgress || dag.recoveryCommitInProgress) {
// Fail the DAG as we have not seen a commit completion
dag.trySetTerminationCause(DAGTerminationCause.COMMIT_FAILURE);
dag.setFinishTime();
// Recover all other data for all vertices
// send recover event to all vertices with a final end state
for (Vertex v : dag.vertices.values()) {
VertexState desiredState = VertexState.SUCCEEDED;
if (dag.recoveredState.equals(DAGState.KILLED)) {
desiredState = VertexState.KILLED;
} else if (EnumSet.of(DAGState.ERROR, DAGState.FAILED).contains(
dag.recoveredState)) {
desiredState = VertexState.FAILED;
}
dag.eventHandler.handle(new VertexEventRecoverVertex(v.getVertexId(),
desiredState));
}
DAGState endState = DAGState.FAILED;
try {
dag.logJobHistoryUnsuccesfulEvent(endState);
} catch (IOException e) {
LOG.warn("Failed to persist recovery event for DAG completion"
+ ", dagId=" + dag.dagId
+ ", finalState=" + endState);
}
dag.eventHandler.handle(new DAGAppMasterEventDAGFinished(dag.getID(),
endState));
return endState;
}
for (Vertex v : dag.vertices.values()) {
if (v.getInputVerticesCount() == 0) {
if (LOG.isDebugEnabled()) {
LOG.debug("Sending Running Recovery event to root vertex "
+ v.getName());
}
dag.eventHandler.handle(new VertexEventRecoverVertex(v.getVertexId(),
VertexState.RUNNING));
}
}
return DAGState.RUNNING;
case SUCCEEDED:
case ERROR:
case FAILED:
case KILLED:
// Completed
// Recover all other data for all vertices
// send recover event to all vertices with a final end state
for (Vertex v : dag.vertices.values()) {
VertexState desiredState = VertexState.SUCCEEDED;
if (dag.recoveredState.equals(DAGState.KILLED)) {
desiredState = VertexState.KILLED;
} else if (EnumSet.of(DAGState.ERROR, DAGState.FAILED).contains(
dag.recoveredState)) {
desiredState = VertexState.FAILED;
}
dag.eventHandler.handle(new VertexEventRecoverVertex(v.getVertexId(),
desiredState));
}
// Let us inform AM of completion
dag.eventHandler.handle(new DAGAppMasterEventDAGFinished(dag.getID(),
dag.recoveredState));
LOG.info("Recovered DAG: " + dag.getID() + " finished with state: "
+ dag.recoveredState);
return dag.recoveredState;
default:
// Error state
LOG.warn("Trying to recover DAG, failed to recover"
+ " from non-handled state" + dag.recoveredState);
// Tell AM ERROR so that it can shutdown
dag.eventHandler.handle(new DAGAppMasterEventDAGFinished(dag.getID(),
DAGState.ERROR));
return DAGState.FAILED;
}
}