protected void doRun()

in twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterService.java [347:421]


  protected void doRun() throws Exception {
    // The main loop
    Map.Entry<AllocationSpecification, ? extends Collection<RuntimeSpecification>> currentRequest = null;
    final Queue<ProvisionRequest> provisioning = Lists.newLinkedList();

    YarnAMClient.AllocateHandler allocateHandler = new YarnAMClient.AllocateHandler() {
      @Override
      public void acquired(List<? extends ProcessLauncher<YarnContainerInfo>> launchers) {
        launchRunnable(launchers, provisioning);
      }

      @Override
      public void completed(List<YarnContainerStatus> completed) {
        handleCompleted(completed);
      }
    };

    long requestStartTime = 0;
    boolean isRequestRelaxed = false;
    long nextTimeoutCheck = System.currentTimeMillis() + Constants.PROVISION_TIMEOUT;
    while (!stopped) {
      // Call allocate. It has to be made at first in order to be able to get cluster resource availability.
      amClient.allocate(0.0f, allocateHandler);

      // Looks for containers requests.
      if (provisioning.isEmpty() && runnableContainerRequests.isEmpty() && runningContainers.isEmpty()) {
        LOG.info("All containers completed. Shutting down application master.");
        break;
      }

      // If nothing is in provisioning, and no pending request, move to next one
      while (provisioning.isEmpty() && currentRequest == null && !runnableContainerRequests.isEmpty()) {
        RunnableContainerRequest runnableContainerRequest = runnableContainerRequests.peek();
        if (!runnableContainerRequest.isReadyToBeProvisioned()) {
          // take it out from queue and put it back at the end for second chance.
          runnableContainerRequest = runnableContainerRequests.poll();
          runnableContainerRequests.add(runnableContainerRequest);

          continue;
        }
        currentRequest = runnableContainerRequest.takeRequest();
        if (currentRequest == null) {
          // All different types of resource request from current order is done, move to next one
          // TODO: Need to handle order type as well
          runnableContainerRequests.poll();
        }
      }

      // Nothing in provision, makes the next batch of provision request
      if (provisioning.isEmpty() && currentRequest != null) {
        manageBlacklist(currentRequest);
        addContainerRequests(currentRequest.getKey().getResource(), currentRequest.getValue(), provisioning,
                             currentRequest.getKey().getType());
        currentRequest = null;
        requestStartTime = System.currentTimeMillis();
        isRequestRelaxed = false;
      }

      // Check for provision request timeout i.e. check if any provision request has been pending
      // for more than the designated time. On timeout, relax the request constraints.
      if (!provisioning.isEmpty() && !isRequestRelaxed &&
        (System.currentTimeMillis() - requestStartTime) > Constants.CONSTRAINED_PROVISION_REQUEST_TIMEOUT) {
        LOG.info("Relaxing provisioning constraints for request {}", provisioning.peek().getRequestId());
        // Clear the blacklist for the pending provision request(s).
        amClient.clearBlacklist();
        isRequestRelaxed = true;
      }

      nextTimeoutCheck = checkProvisionTimeout(nextTimeoutCheck);

      if (isRunning()) {
        TimeUnit.SECONDS.sleep(1);
      }
    }
  }