private void launchRunnable()

in twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterService.java [624:677]


  private void launchRunnable(List<? extends ProcessLauncher<YarnContainerInfo>> launchers,
                              Queue<ProvisionRequest> provisioning) {
    for (ProcessLauncher<YarnContainerInfo> processLauncher : launchers) {
      LOG.info("Got container {}", processLauncher.getContainerInfo().getId());
      ProvisionRequest provisionRequest = provisioning.peek();
      if (provisionRequest == null) {
        continue;
      }

      String runnableName = provisionRequest.getRuntimeSpec().getName();
      LOG.info("Starting runnable {} with {}", runnableName, processLauncher);

      LOG.debug("Log level for Twill runnable {} is {}", runnableName, System.getenv(EnvKeys.TWILL_APP_LOG_LEVEL));

      int containerCount = expectedContainers.getExpected(runnableName);

      // Setup container environment variables
      Map<String, String> env = new LinkedHashMap<>();
      if (environments.containsKey(runnableName)) {
        env.putAll(environments.get(runnableName));
      }
      // Override with system env
      env.put(EnvKeys.TWILL_APP_DIR, System.getenv(EnvKeys.TWILL_APP_DIR));
      env.put(EnvKeys.TWILL_FS_USER, System.getenv(EnvKeys.TWILL_FS_USER));
      env.put(EnvKeys.TWILL_APP_RUN_ID, runId.getId());
      env.put(EnvKeys.TWILL_APP_NAME, twillSpec.getName());
      env.put(EnvKeys.TWILL_APP_LOG_LEVEL, System.getenv(EnvKeys.TWILL_APP_LOG_LEVEL));
      env.put(EnvKeys.TWILL_ZK_CONNECT, System.getenv(EnvKeys.TWILL_ZK_CONNECT));
      env.put(EnvKeys.TWILL_LOG_KAFKA_ZK, getKafkaZKConnect());

      ProcessLauncher.PrepareLaunchContext launchContext = processLauncher.prepareLaunch(env, getLocalizeFiles(),
                                                                                         credentials);

      TwillContainerLauncher launcher = new TwillContainerLauncher(
        twillSpec.getRunnables().get(runnableName), processLauncher.getContainerInfo(), launchContext,
        ZKClients.namespace(zkClient, getZKNamespace(runnableName)),
        containerCount, jvmOpts, reservedMemory, getSecureStoreLocation());

      runningContainers.start(runnableName, processLauncher.getContainerInfo(), launcher);

      // Need to call complete to workaround bug in YARN AMRMClient
      if (provisionRequest.containerAcquired()) {
        amClient.completeContainerRequest(provisionRequest.getRequestId());
      }

      if (expectedContainers.getExpected(runnableName) == runningContainers.count(runnableName) ||
        provisioning.peek().getType().equals(AllocationSpecification.Type.ALLOCATE_ONE_INSTANCE_AT_A_TIME)) {
        provisioning.poll();
      }
      if (expectedContainers.getExpected(runnableName) == runningContainers.count(runnableName)) {
        LOG.info("Runnable " + runnableName + " fully provisioned with " + containerCount + " instances.");
      }
    }
  }