private Runnable createStatusPollingRunnable()

in twill-yarn/src/main/java/org/apache/twill/yarn/YarnTwillController.java [234:288]


  private Runnable createStatusPollingRunnable() {
    return new Runnable() {

      @Override
      public void run() {
        YarnApplicationReport report = processController.getReport();
        ApplicationId appId = report.getApplicationId();
        boolean shutdown = false;
        boolean watchInstanceNode = false;

        try {
          LOG.debug("Polling status from Yarn for {} {}.", appName, appId);
          while (!Thread.currentThread().isInterrupted()) {
            if (report.getFinalApplicationStatus() != FinalApplicationStatus.UNDEFINED) {
              shutdown = true;
              break;
            }
            // Make a sync exists call to instance node and re-watch if the node exists
            try {
              // The timeout is arbitrary, as it's just for avoiding block forever
              Stat stat = zkClient.exists(getInstancePath()).get(5, TimeUnit.SECONDS);
              if (stat != null) {
                watchInstanceNode = true;
                break;
              }
            } catch (ExecutionException e) {
              // Ignore the exception, as any exception won't affect the status polling.
              LOG.debug("Failed in exists call on ZK path {}.", getInstancePath(), e);
            } catch (TimeoutException e) {
              LOG.debug("Timeout in exists call on ZK path {}.", getInstancePath(), e);
            }

            TimeUnit.SECONDS.sleep(1);
            report = processController.getReport();
          }
        } catch (InterruptedException e) {
          // OK to ignore.
          LOG.debug("Status polling thread interrupted for application {} {}", appName, appId);
        }

        LOG.debug("Stop polling status from Yarn for {} {}.", appName, appId);

        if (shutdown) {
          LOG.info("Yarn application {} {} completed. Shutting down controller.", appName, appId);
          forceShutDown();
        } else if (watchInstanceNode) {
          LOG.info("Rewatch instance node for {} {} at {}", appName, appId, getInstancePath());
          synchronized (YarnTwillController.this) {
            statusPollingThread = null;
            watchInstanceNode();
          }
        }
      }
    };
  }