private DAGStatus _waitForCompletionWithStatusUpdates()

in tez-api/src/main/java/org/apache/tez/dag/api/client/rpc/DAGClientRPCImpl.java [345:387]


  private DAGStatus _waitForCompletionWithStatusUpdates(@Nullable Set<String> vertexNames,
      @Nullable Set<StatusGetOpts> statusGetOpts) throws IOException, TezException {
    DAGStatus dagStatus;
    boolean initPrinted = false;
    double dagProgress = -1.0; // Print the first one
    // monitoring
    while (true) {
      dagStatus = getDAGStatus(statusGetOpts);
      if (!initPrinted
          && (dagStatus.getState() == DAGStatus.State.INITING || dagStatus.getState() == DAGStatus.State.SUBMITTED)) {
        initPrinted = true; // Print once
        log("Waiting for DAG to start running");
      }
      if (dagStatus.getState() == DAGStatus.State.RUNNING
          || dagStatus.getState() == DAGStatus.State.SUCCEEDED
          || dagStatus.getState() == DAGStatus.State.FAILED
          || dagStatus.getState() == DAGStatus.State.KILLED
          || dagStatus.getState() == DAGStatus.State.ERROR) {
        break;
      }
      try {
        Thread.sleep(SLEEP_FOR_COMPLETION);
      } catch (InterruptedException e) {
        // continue;
      }
    }// End of while(true)
    while (dagStatus.getState() == DAGStatus.State.RUNNING) {
      if (vertexNames != null) {
        dagProgress = monitorProgress(vertexNames, dagProgress, null, dagStatus);
      }
      try {
        Thread.sleep(SLEEP_FOR_COMPLETION);
      } catch (InterruptedException e) {
      }
      dagStatus = getDAGStatus(statusGetOpts);
    }// end of while
    if (vertexNames != null) {
      // Always print the last status irrespective of progress change
      monitorProgress(vertexNames, -1.0, statusGetOpts, dagStatus);
    }
    log("DAG completed. " + "FinalState=" + dagStatus.getState());
    return dagStatus;
  }