in tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java [929:1002]
static DAGState checkDAGForCompletion(DAGImpl dag) {
if (LOG.isDebugEnabled()) {
LOG.debug("Checking dag completion"
+ ", numCompletedVertices=" + dag.numCompletedVertices
+ ", numSuccessfulVertices=" + dag.numSuccessfulVertices
+ ", numFailedVertices=" + dag.numFailedVertices
+ ", numKilledVertices=" + dag.numKilledVertices
+ ", numVertices=" + dag.numVertices
+ ", terminationCause=" + dag.terminationCause);
}
// log in case of accounting error.
if (dag.numCompletedVertices > dag.numVertices) {
LOG.error("vertex completion accounting issue: numCompletedVertices > numVertices"
+ ", numCompletedVertices=" + dag.numCompletedVertices
+ ", numVertices=" + dag.numVertices
);
}
if (dag.numCompletedVertices == dag.numVertices) {
dag.setFinishTime();
//Only succeed if vertices complete successfully and no terminationCause is registered.
if(dag.numSuccessfulVertices == dag.numVertices && dag.terminationCause == null) {
return dag.finished(DAGState.SUCCEEDED);
}
if(dag.terminationCause == DAGTerminationCause.DAG_KILL ){
String diagnosticMsg = "DAG killed due to user-initiated kill." +
" failedVertices:" + dag.numFailedVertices +
" killedVertices:" + dag.numKilledVertices;
LOG.info(diagnosticMsg);
dag.addDiagnostic(diagnosticMsg);
return dag.finished(DAGState.KILLED);
}
if(dag.terminationCause == DAGTerminationCause.VERTEX_FAILURE ){
String diagnosticMsg = "DAG failed due to vertex failure." +
" failedVertices:" + dag.numFailedVertices +
" killedVertices:" + dag.numKilledVertices;
LOG.info(diagnosticMsg);
dag.addDiagnostic(diagnosticMsg);
return dag.finished(DAGState.FAILED);
}
if(dag.terminationCause == DAGTerminationCause.COMMIT_FAILURE ){
String diagnosticMsg = "DAG failed due to commit failure." +
" failedVertices:" + dag.numFailedVertices +
" killedVertices:" + dag.numKilledVertices;
LOG.info(diagnosticMsg);
dag.addDiagnostic(diagnosticMsg);
return dag.finished(DAGState.FAILED);
}
if(dag.terminationCause == DAGTerminationCause.RECOVERY_FAILURE ){
String diagnosticMsg = "DAG failed due to failure in recovery handling." +
" failedVertices:" + dag.numFailedVertices +
" killedVertices:" + dag.numKilledVertices;
LOG.info(diagnosticMsg);
dag.addDiagnostic(diagnosticMsg);
return dag.finished(DAGState.FAILED);
}
// catch all
String diagnosticMsg = "All vertices complete, but cannot determine final state of DAG"
+ ", numCompletedVertices=" + dag.numCompletedVertices
+ ", numSuccessfulVertices=" + dag.numSuccessfulVertices
+ ", numFailedVertices=" + dag.numFailedVertices
+ ", numKilledVertices=" + dag.numKilledVertices
+ ", numVertices=" + dag.numVertices
+ ", terminationCause=" + dag.terminationCause;
LOG.error(diagnosticMsg);
dag.addDiagnostic(diagnosticMsg);
return dag.finished(DAGState.ERROR);
}
//return the current state, Job not finished yet
return dag.getInternalState();
}