static DAGState checkDAGForCompletion()

in tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java [929:1002]


  static DAGState checkDAGForCompletion(DAGImpl dag) {
    if (LOG.isDebugEnabled()) {
      LOG.debug("Checking dag completion"
          + ", numCompletedVertices=" + dag.numCompletedVertices
          + ", numSuccessfulVertices=" + dag.numSuccessfulVertices
          + ", numFailedVertices=" + dag.numFailedVertices
          + ", numKilledVertices=" + dag.numKilledVertices
          + ", numVertices=" + dag.numVertices
          + ", terminationCause=" + dag.terminationCause);
    }

    // log in case of accounting error.
    if (dag.numCompletedVertices > dag.numVertices) {
      LOG.error("vertex completion accounting issue: numCompletedVertices > numVertices"
          + ", numCompletedVertices=" + dag.numCompletedVertices
          + ", numVertices=" + dag.numVertices
          );
    }

    if (dag.numCompletedVertices == dag.numVertices) {
      dag.setFinishTime();
      //Only succeed if vertices complete successfully and no terminationCause is registered.
      if(dag.numSuccessfulVertices == dag.numVertices && dag.terminationCause == null) {
        return dag.finished(DAGState.SUCCEEDED);
      }
      if(dag.terminationCause == DAGTerminationCause.DAG_KILL ){
        String diagnosticMsg = "DAG killed due to user-initiated kill." +
            " failedVertices:" + dag.numFailedVertices +
            " killedVertices:" + dag.numKilledVertices;
        LOG.info(diagnosticMsg);
        dag.addDiagnostic(diagnosticMsg);
        return dag.finished(DAGState.KILLED);
      }
      if(dag.terminationCause == DAGTerminationCause.VERTEX_FAILURE ){
        String diagnosticMsg = "DAG failed due to vertex failure." +
            " failedVertices:" + dag.numFailedVertices +
            " killedVertices:" + dag.numKilledVertices;
        LOG.info(diagnosticMsg);
        dag.addDiagnostic(diagnosticMsg);
        return dag.finished(DAGState.FAILED);
      }
      if(dag.terminationCause == DAGTerminationCause.COMMIT_FAILURE ){
        String diagnosticMsg = "DAG failed due to commit failure." +
            " failedVertices:" + dag.numFailedVertices +
            " killedVertices:" + dag.numKilledVertices;
        LOG.info(diagnosticMsg);
        dag.addDiagnostic(diagnosticMsg);
        return dag.finished(DAGState.FAILED);
      }
      if(dag.terminationCause == DAGTerminationCause.RECOVERY_FAILURE ){
        String diagnosticMsg = "DAG failed due to failure in recovery handling." +
            " failedVertices:" + dag.numFailedVertices +
            " killedVertices:" + dag.numKilledVertices;
        LOG.info(diagnosticMsg);
        dag.addDiagnostic(diagnosticMsg);
        return dag.finished(DAGState.FAILED);
      }

      // catch all
      String diagnosticMsg = "All vertices complete, but cannot determine final state of DAG"
          + ", numCompletedVertices=" + dag.numCompletedVertices
          + ", numSuccessfulVertices=" + dag.numSuccessfulVertices
          + ", numFailedVertices=" + dag.numFailedVertices
          + ", numKilledVertices=" + dag.numKilledVertices
          + ", numVertices=" + dag.numVertices
          + ", terminationCause=" + dag.terminationCause;
      LOG.error(diagnosticMsg);
      dag.addDiagnostic(diagnosticMsg);
      return dag.finished(DAGState.ERROR);
    }

    //return the current state, Job not finished yet
    return dag.getInternalState();
  }