public DAGStatus getDAGStatus()

in tez-api/src/main/java/org/apache/tez/dag/api/client/DAGClientImpl.java [153:235]


  public DAGStatus getDAGStatus(@Nullable Set<StatusGetOpts> statusOptions,
      final long timeout) throws TezException, IOException {

    Preconditions.checkArgument(timeout >= -1, "Timeout must be >= -1");
    // Short circuit a timeout of 0.
    if (timeout == 0) {
      return getDAGStatusInternal(statusOptions, timeout);
    }

    long startTime = System.currentTimeMillis();

    DAGStatus dagStatus = cachedDAGStatusRef.getValue();
    boolean refreshStatus = true;
    if (dagStatus == null) {
      // the first lookup only or when the cachedDAG has expired
      dagStatus = getDAGStatus(statusOptions);
      refreshStatus = false;
    }

    // Handling when client dag status init or submitted. This really implies that the RM was
    // contacted to get status. INITING is never used. DAG_INITING implies a DagState of RUNNING.
    if (dagStatus.getState() == DAGStatus.State.INITING
        || dagStatus.getState() == DAGStatus.State.SUBMITTED) {
      long timeoutAbsolute = startTime + timeout;
      while (timeout < 0
          || (timeout > 0 && timeoutAbsolute > System.currentTimeMillis())) {
        if (refreshStatus) {
          // Try fetching the state with a timeout, in case the AM is already up.
          dagStatus = getDAGStatusInternal(statusOptions, timeout);
        }
        refreshStatus = true; // For the next iteration of the loop.

        if (dagStatus.getState() == DAGStatus.State.RUNNING) {
          // Refreshed status indicates that the DAG is running.
          // This status could have come from the AM or the RM - client sleep if RM, otherwise send request to the AM.
          if (dagStatus.getSource() == DagStatusSource.AM) {
            // RUNNING + AM should only happen if timeout is > -1.
            // Otherwise the AM ignored the -1 value, or the AM source in the DAGStatus is invalid.
            Preconditions.checkState(timeout > -1, "Should not reach here with a timeout of -1. File a bug");
            return dagStatus;
          } else {
            // From the RM. Fall through to the Sleep.
          }
        } else if(dagStatus.getState() == DAGStatus.State.SUCCEEDED
            || dagStatus.getState() == DAGStatus.State.FAILED
            || dagStatus.getState() == DAGStatus.State.KILLED
            || dagStatus.getState() == DAGStatus.State.ERROR) {
          // Again, check if this was from the RM. If it was, try getting it from a more informative source.
          if (dagStatus.getSource() == DagStatusSource.RM) {
            return getDAGStatusInternal(statusOptions, 0);
          } else {
            return dagStatus;
          }
        }
        // Sleep before checking again.
        long currentStatusPollInterval;
        if (timeout < 0) {
          currentStatusPollInterval = statusPollInterval;
        } else {
          long remainingTimeout = timeoutAbsolute - System.currentTimeMillis();
          if (remainingTimeout < 0) {
            // Timeout expired. Return the latest known dag status.
            return dagStatus;
          } else {
            currentStatusPollInterval = remainingTimeout < statusPollInterval ? remainingTimeout : statusPollInterval;
          }
        }
        try {
          Thread.sleep(currentStatusPollInterval);
        } catch (InterruptedException e) {
          throw new TezException(e);
        }
      }// End of while
      // Timeout may have expired before a single refresh
      if (refreshStatus) {
        return getDAGStatus(statusOptions);
      } else {
        return dagStatus;
      }
    } else { // Already running, or complete. Fallback to regular dagStatus with a timeout.
      return getDAGStatusInternal(statusOptions, timeout);
    }
  }