in twill-yarn/src/main/java/org/apache/twill/yarn/YarnTwillController.java [234:288]
private Runnable createStatusPollingRunnable() {
return new Runnable() {
@Override
public void run() {
YarnApplicationReport report = processController.getReport();
ApplicationId appId = report.getApplicationId();
boolean shutdown = false;
boolean watchInstanceNode = false;
try {
LOG.debug("Polling status from Yarn for {} {}.", appName, appId);
while (!Thread.currentThread().isInterrupted()) {
if (report.getFinalApplicationStatus() != FinalApplicationStatus.UNDEFINED) {
shutdown = true;
break;
}
// Make a sync exists call to instance node and re-watch if the node exists
try {
// The timeout is arbitrary, as it's just for avoiding block forever
Stat stat = zkClient.exists(getInstancePath()).get(5, TimeUnit.SECONDS);
if (stat != null) {
watchInstanceNode = true;
break;
}
} catch (ExecutionException e) {
// Ignore the exception, as any exception won't affect the status polling.
LOG.debug("Failed in exists call on ZK path {}.", getInstancePath(), e);
} catch (TimeoutException e) {
LOG.debug("Timeout in exists call on ZK path {}.", getInstancePath(), e);
}
TimeUnit.SECONDS.sleep(1);
report = processController.getReport();
}
} catch (InterruptedException e) {
// OK to ignore.
LOG.debug("Status polling thread interrupted for application {} {}", appName, appId);
}
LOG.debug("Stop polling status from Yarn for {} {}.", appName, appId);
if (shutdown) {
LOG.info("Yarn application {} {} completed. Shutting down controller.", appName, appId);
forceShutDown();
} else if (watchInstanceNode) {
LOG.info("Rewatch instance node for {} {} at {}", appName, appId, getInstancePath());
synchronized (YarnTwillController.this) {
statusPollingThread = null;
watchInstanceNode();
}
}
}
};
}