in odps-console-xflow/src/main/java/com/aliyun/openservices/odps/console/xflow/PAICommand.java [457:559]
public static void waitForInstanceTerminate(XFlowProgressHelper progressHelper,
Set<String> logviewHasPrintSet, Instance xInstance,
Odps odps, ExecutionContext context)
throws OdpsException, ODPSConsoleException {
int retryTimes = 0;
XFlows xFlows = odps.xFlows();
int interval = progressHelper.getInterval();
boolean terminatedFlag = false;
while (true) {
try {
for (XFlows.XResult xResult : xFlows.getXResults(xInstance).values()) {
if ("SubWorkflow".equalsIgnoreCase(xResult.getNodeType())) {
waitForInstanceTerminate(progressHelper, logviewHasPrintSet, odps.instances().get(xResult.getInstanceId()), odps, context);
} else if (!"Local".equalsIgnoreCase(xResult.getNodeType())) {
Instance i = odps.instances().get(odps.getDefaultProject(), xResult.getInstanceId());
String ID = i.getId();
if (!logviewHasPrintSet.contains(ID)) {
logviewHasPrintSet.add(ID);
System.err.println("Sub Instance ID = " + ID);
System.err.println(ODPSConsoleUtils.generateLogView(odps, i, context));
}
for (String task : i.getTaskNames()) {
String instTask = ID + "." + task;
if (logviewHasPrintSet.contains(instTask)) {
continue;
}
TaskSummary taskSummary = i.getTaskSummary(task);
if (taskSummary != null) {
String summary = taskSummary.getJsonSummary();
JsonObject jsonObject = (JsonObject) new JsonParser().parse(summary);
if (jsonObject == null) {
continue;
}
String sigmaLogView = null;
if (jsonObject.has("SigmaJobLogView")) {
sigmaLogView = jsonObject.get("SigmaJobLogView").getAsString();
}
String sigmaJobName = null;
if (jsonObject.has("SigmaJobName")) {
sigmaJobName = jsonObject.get("SigmaJobName").getAsString();
}
if (!StringUtils.isNullOrEmpty(sigmaJobName)) {
System.err.println("Sigma Job Name = " + sigmaJobName);
if (!StringUtils.isNullOrEmpty(sigmaLogView)) {
System.err.println(sigmaLogView);
logviewHasPrintSet.add(instTask);
}
}
}
}
if (progressHelper.needProgressMessage(i)) {
System.err.println(progressHelper.getProgressMessage(i));
}
}
}
//FOR CNN ALGO, we should print one more progress message after terminated.
if (xInstance.isTerminated()) {
if (terminatedFlag) {
break;
} else {
terminatedFlag = true;
}
}
Thread.sleep(interval);
retryTimes = 0;
} catch (InterruptedException e) {
throw new UserInterruptException(e.getMessage());
} catch (OdpsException e) {
// retry for service will throw exception in 400 code immediately
// so need retry and add interval time.
++retryTimes;
try {
Thread.sleep(2 * interval);
} catch (InterruptedException e1) {
throw new UserInterruptException(e.getMessage());
}
if (retryTimes > MAX_RETRY_TIMES) {
throw e;
}
System.err.println(String.format("retry %d times.", retryTimes));
} catch (ReloadException e) {
++retryTimes;
try {
Thread.sleep(2 * interval);
} catch (InterruptedException e1) {
throw new UserInterruptException(e.getMessage());
}
if (retryTimes > MAX_RETRY_TIMES) {
throw e;
}
System.err.println(String.format("retry %d times.", retryTimes));
}
}
}