in tez-api/src/main/java/org/apache/tez/dag/api/client/DAGClientImpl.java [153:235]
public DAGStatus getDAGStatus(@Nullable Set<StatusGetOpts> statusOptions,
final long timeout) throws TezException, IOException {
Preconditions.checkArgument(timeout >= -1, "Timeout must be >= -1");
// Short circuit a timeout of 0.
if (timeout == 0) {
return getDAGStatusInternal(statusOptions, timeout);
}
long startTime = System.currentTimeMillis();
DAGStatus dagStatus = cachedDAGStatusRef.getValue();
boolean refreshStatus = true;
if (dagStatus == null) {
// the first lookup only or when the cachedDAG has expired
dagStatus = getDAGStatus(statusOptions);
refreshStatus = false;
}
// Handling when client dag status init or submitted. This really implies that the RM was
// contacted to get status. INITING is never used. DAG_INITING implies a DagState of RUNNING.
if (dagStatus.getState() == DAGStatus.State.INITING
|| dagStatus.getState() == DAGStatus.State.SUBMITTED) {
long timeoutAbsolute = startTime + timeout;
while (timeout < 0
|| (timeout > 0 && timeoutAbsolute > System.currentTimeMillis())) {
if (refreshStatus) {
// Try fetching the state with a timeout, in case the AM is already up.
dagStatus = getDAGStatusInternal(statusOptions, timeout);
}
refreshStatus = true; // For the next iteration of the loop.
if (dagStatus.getState() == DAGStatus.State.RUNNING) {
// Refreshed status indicates that the DAG is running.
// This status could have come from the AM or the RM - client sleep if RM, otherwise send request to the AM.
if (dagStatus.getSource() == DagStatusSource.AM) {
// RUNNING + AM should only happen if timeout is > -1.
// Otherwise the AM ignored the -1 value, or the AM source in the DAGStatus is invalid.
Preconditions.checkState(timeout > -1, "Should not reach here with a timeout of -1. File a bug");
return dagStatus;
} else {
// From the RM. Fall through to the Sleep.
}
} else if(dagStatus.getState() == DAGStatus.State.SUCCEEDED
|| dagStatus.getState() == DAGStatus.State.FAILED
|| dagStatus.getState() == DAGStatus.State.KILLED
|| dagStatus.getState() == DAGStatus.State.ERROR) {
// Again, check if this was from the RM. If it was, try getting it from a more informative source.
if (dagStatus.getSource() == DagStatusSource.RM) {
return getDAGStatusInternal(statusOptions, 0);
} else {
return dagStatus;
}
}
// Sleep before checking again.
long currentStatusPollInterval;
if (timeout < 0) {
currentStatusPollInterval = statusPollInterval;
} else {
long remainingTimeout = timeoutAbsolute - System.currentTimeMillis();
if (remainingTimeout < 0) {
// Timeout expired. Return the latest known dag status.
return dagStatus;
} else {
currentStatusPollInterval = remainingTimeout < statusPollInterval ? remainingTimeout : statusPollInterval;
}
}
try {
Thread.sleep(currentStatusPollInterval);
} catch (InterruptedException e) {
throw new TezException(e);
}
}// End of while
// Timeout may have expired before a single refresh
if (refreshStatus) {
return getDAGStatus(statusOptions);
} else {
return dagStatus;
}
} else { // Already running, or complete. Fallback to regular dagStatus with a timeout.
return getDAGStatusInternal(statusOptions, timeout);
}
}