private DAGStatus _waitForCompletionWithStatusUpdates()

in tez-api/src/main/java/org/apache/tez/dag/api/client/DAGClientImpl.java [561:616]


  private DAGStatus _waitForCompletionWithStatusUpdates(long timeMs,
                                                        boolean vertexUpdates,
                                                        @Nullable Set<StatusGetOpts> statusGetOpts) throws IOException, TezException, InterruptedException {
    DAGStatus dagStatus;
    boolean initPrinted = false;
    boolean runningPrinted = false;
    double dagProgress = -1.0; // Print the first one
    // monitoring
    Long maxNs = timeMs >= 0 ? (System.nanoTime() + (timeMs * 1000000L)) : null;
    while (true) {
      try {
        dagStatus = getDAGStatus(statusGetOpts, SLEEP_FOR_COMPLETION);
      } catch (DAGNotRunningException ex) {
        return null;
      }
      if (!initPrinted
          && (dagStatus.getState() == DAGStatus.State.INITING || dagStatus.getState() == DAGStatus.State.SUBMITTED)) {
        initPrinted = true; // Print once
        log("Waiting for DAG to start running");
      }
      if (dagStatus.getState() == DAGStatus.State.RUNNING
          || dagStatus.getState() == DAGStatus.State.SUCCEEDED
          || dagStatus.getState() == DAGStatus.State.FAILED
          || dagStatus.getState() == DAGStatus.State.KILLED
          || dagStatus.getState() == DAGStatus.State.ERROR) {
        break;
      }
      if (maxNs != null && System.nanoTime() > maxNs) {
        return null;
      }
    }// End of while(true)

    Set<String> vertexNames = Collections.emptySet();
    while (!dagStatus.isCompleted()) {
      if (!runningPrinted) {
        log("DAG initialized: CurrentState=Running");
        runningPrinted = true;
      }
      if (vertexUpdates && vertexNames.isEmpty()) {
        vertexNames = getDAGStatus(statusGetOpts).getVertexProgress().keySet();
      }
      dagProgress = monitorProgress(vertexNames, dagProgress, null, dagStatus);
      try {
        dagStatus = getDAGStatus(statusGetOpts, SLEEP_FOR_COMPLETION);
      } catch (DAGNotRunningException ex) {
        return null;
      }
      if (maxNs != null && System.nanoTime() > maxNs) {
        return null;
      }
    }// end of while
    // Always print the last status irrespective of progress change
    monitorProgress(vertexNames, -1.0, statusGetOpts, dagStatus);
    log("DAG completed. " + "FinalState=" + dagStatus.getState());
    return dagStatus;
  }