in tez-api/src/main/java/org/apache/tez/dag/api/client/rpc/DAGClientRPCImpl.java [345:387]
private DAGStatus _waitForCompletionWithStatusUpdates(@Nullable Set<String> vertexNames,
@Nullable Set<StatusGetOpts> statusGetOpts) throws IOException, TezException {
DAGStatus dagStatus;
boolean initPrinted = false;
double dagProgress = -1.0; // Print the first one
// monitoring
while (true) {
dagStatus = getDAGStatus(statusGetOpts);
if (!initPrinted
&& (dagStatus.getState() == DAGStatus.State.INITING || dagStatus.getState() == DAGStatus.State.SUBMITTED)) {
initPrinted = true; // Print once
log("Waiting for DAG to start running");
}
if (dagStatus.getState() == DAGStatus.State.RUNNING
|| dagStatus.getState() == DAGStatus.State.SUCCEEDED
|| dagStatus.getState() == DAGStatus.State.FAILED
|| dagStatus.getState() == DAGStatus.State.KILLED
|| dagStatus.getState() == DAGStatus.State.ERROR) {
break;
}
try {
Thread.sleep(SLEEP_FOR_COMPLETION);
} catch (InterruptedException e) {
// continue;
}
}// End of while(true)
while (dagStatus.getState() == DAGStatus.State.RUNNING) {
if (vertexNames != null) {
dagProgress = monitorProgress(vertexNames, dagProgress, null, dagStatus);
}
try {
Thread.sleep(SLEEP_FOR_COMPLETION);
} catch (InterruptedException e) {
}
dagStatus = getDAGStatus(statusGetOpts);
}// end of while
if (vertexNames != null) {
// Always print the last status irrespective of progress change
monitorProgress(vertexNames, -1.0, statusGetOpts, dagStatus);
}
log("DAG completed. " + "FinalState=" + dagStatus.getState());
return dagStatus;
}