in tez-api/src/main/java/org/apache/tez/dag/api/client/DAGClientImpl.java [561:616]
private DAGStatus _waitForCompletionWithStatusUpdates(long timeMs,
boolean vertexUpdates,
@Nullable Set<StatusGetOpts> statusGetOpts) throws IOException, TezException, InterruptedException {
DAGStatus dagStatus;
boolean initPrinted = false;
boolean runningPrinted = false;
double dagProgress = -1.0; // Print the first one
// monitoring
Long maxNs = timeMs >= 0 ? (System.nanoTime() + (timeMs * 1000000L)) : null;
while (true) {
try {
dagStatus = getDAGStatus(statusGetOpts, SLEEP_FOR_COMPLETION);
} catch (DAGNotRunningException ex) {
return null;
}
if (!initPrinted
&& (dagStatus.getState() == DAGStatus.State.INITING || dagStatus.getState() == DAGStatus.State.SUBMITTED)) {
initPrinted = true; // Print once
log("Waiting for DAG to start running");
}
if (dagStatus.getState() == DAGStatus.State.RUNNING
|| dagStatus.getState() == DAGStatus.State.SUCCEEDED
|| dagStatus.getState() == DAGStatus.State.FAILED
|| dagStatus.getState() == DAGStatus.State.KILLED
|| dagStatus.getState() == DAGStatus.State.ERROR) {
break;
}
if (maxNs != null && System.nanoTime() > maxNs) {
return null;
}
}// End of while(true)
Set<String> vertexNames = Collections.emptySet();
while (!dagStatus.isCompleted()) {
if (!runningPrinted) {
log("DAG initialized: CurrentState=Running");
runningPrinted = true;
}
if (vertexUpdates && vertexNames.isEmpty()) {
vertexNames = getDAGStatus(statusGetOpts).getVertexProgress().keySet();
}
dagProgress = monitorProgress(vertexNames, dagProgress, null, dagStatus);
try {
dagStatus = getDAGStatus(statusGetOpts, SLEEP_FOR_COMPLETION);
} catch (DAGNotRunningException ex) {
return null;
}
if (maxNs != null && System.nanoTime() > maxNs) {
return null;
}
}// end of while
// Always print the last status irrespective of progress change
monitorProgress(vertexNames, -1.0, statusGetOpts, dagStatus);
log("DAG completed. " + "FinalState=" + dagStatus.getState());
return dagStatus;
}