in tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java [1436:1573]
static VertexState checkVertexForCompletion(final VertexImpl vertex) {
if (LOG.isDebugEnabled()) {
LOG.debug("Checking for vertex completion for "
+ vertex.logIdentifier
+ ", numTasks=" + vertex.numTasks
+ ", failedTaskCount=" + vertex.failedTaskCount
+ ", killedTaskCount=" + vertex.killedTaskCount
+ ", successfulTaskCount=" + vertex.succeededTaskCount
+ ", completedTaskCount=" + vertex.completedTaskCount
+ ", terminationCause=" + vertex.terminationCause);
}
//check for vertex failure first
if (vertex.completedTaskCount > vertex.tasks.size()) {
LOG.error("task completion accounting issue: completedTaskCount > nTasks:"
+ " for vertex " + vertex.logIdentifier
+ ", numTasks=" + vertex.numTasks
+ ", failedTaskCount=" + vertex.failedTaskCount
+ ", killedTaskCount=" + vertex.killedTaskCount
+ ", successfulTaskCount=" + vertex.succeededTaskCount
+ ", completedTaskCount=" + vertex.completedTaskCount
+ ", terminationCause=" + vertex.terminationCause);
}
if (vertex.completedTaskCount == vertex.tasks.size()) {
//Only succeed if tasks complete successfully and no terminationCause is registered.
if(vertex.succeededTaskCount == vertex.tasks.size() && vertex.terminationCause == null) {
LOG.info("Vertex succeeded: " + vertex.logIdentifier);
try {
if (vertex.commitVertexOutputs && !vertex.committed.getAndSet(true)) {
// commit only once. Dont commit shared outputs
LOG.info("Invoking committer commit for vertex, vertexId="
+ vertex.logIdentifier);
if (vertex.outputCommitters != null
&& !vertex.outputCommitters.isEmpty()) {
boolean firstCommit = true;
for (Entry<String, OutputCommitter> entry : vertex.outputCommitters.entrySet()) {
final OutputCommitter committer = entry.getValue();
final String outputName = entry.getKey();
if (vertex.sharedOutputs.contains(outputName)) {
// dont commit shared committers. Will be committed by the DAG
continue;
}
if (firstCommit) {
// Log commit start event on first actual commit
try {
vertex.appContext.getHistoryHandler().handleCriticalEvent(
new DAGHistoryEvent(vertex.getDAGId(),
new VertexCommitStartedEvent(vertex.vertexId,
vertex.clock.getTime())));
} catch (IOException e) {
LOG.error("Failed to persist commit start event to recovery, vertexId="
+ vertex.logIdentifier, e);
vertex.trySetTerminationCause(VertexTerminationCause.INTERNAL_ERROR);
return vertex.finished(VertexState.FAILED);
}
} else {
firstCommit = false;
}
vertex.dagUgi.doAs(new PrivilegedExceptionAction<Void>() {
@Override
public Void run() throws Exception {
LOG.info("Invoking committer commit for output=" + outputName
+ ", vertexId=" + vertex.logIdentifier);
committer.commitOutput();
return null;
}
});
}
}
}
} catch (Exception e) {
LOG.error("Failed to do commit on vertex, vertexId="
+ vertex.logIdentifier, e);
vertex.trySetTerminationCause(VertexTerminationCause.COMMIT_FAILURE);
return vertex.finished(VertexState.FAILED);
}
return vertex.finished(VertexState.SUCCEEDED);
}
else if(vertex.terminationCause == VertexTerminationCause.DAG_KILL ){
vertex.setFinishTime();
String diagnosticMsg = "Vertex killed due to user-initiated job kill. "
+ "failedTasks:"
+ vertex.failedTaskCount;
LOG.info(diagnosticMsg);
vertex.addDiagnostic(diagnosticMsg);
vertex.abortVertex(VertexStatus.State.KILLED);
return vertex.finished(VertexState.KILLED);
}
else if(vertex.terminationCause == VertexTerminationCause.OTHER_VERTEX_FAILURE ){
vertex.setFinishTime();
String diagnosticMsg = "Vertex killed as other vertex failed. "
+ "failedTasks:"
+ vertex.failedTaskCount;
LOG.info(diagnosticMsg);
vertex.addDiagnostic(diagnosticMsg);
vertex.abortVertex(VertexStatus.State.KILLED);
return vertex.finished(VertexState.KILLED);
}
else if(vertex.terminationCause == VertexTerminationCause.OWN_TASK_FAILURE ){
if(vertex.failedTaskCount == 0){
LOG.error("task failure accounting error. terminationCause=TASK_FAILURE but vertex.failedTaskCount == 0");
}
vertex.setFinishTime();
String diagnosticMsg = "Vertex failed as one or more tasks failed. "
+ "failedTasks:"
+ vertex.failedTaskCount;
LOG.info(diagnosticMsg);
vertex.addDiagnostic(diagnosticMsg);
vertex.abortVertex(VertexStatus.State.FAILED);
return vertex.finished(VertexState.FAILED);
}
else if (vertex.terminationCause == VertexTerminationCause.INTERNAL_ERROR) {
vertex.setFinishTime();
String diagnosticMsg = "Vertex failed/killed due to internal error. "
+ "failedTasks:"
+ vertex.failedTaskCount
+ " killedTasks:"
+ vertex.killedTaskCount;
LOG.info(diagnosticMsg);
vertex.abortVertex(State.FAILED);
return vertex.finished(VertexState.FAILED);
}
else {
//should never occur
throw new TezUncheckedException("All tasks complete, but cannot determine final state of vertex"
+ ", failedTaskCount=" + vertex.failedTaskCount
+ ", killedTaskCount=" + vertex.killedTaskCount
+ ", successfulTaskCount=" + vertex.succeededTaskCount
+ ", completedTaskCount=" + vertex.completedTaskCount
+ ", terminationCause=" + vertex.terminationCause);
}
}
//return the current state, Vertex not finished yet
return vertex.getInternalState();
}