in twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterService.java [624:677]
private void launchRunnable(List<? extends ProcessLauncher<YarnContainerInfo>> launchers,
Queue<ProvisionRequest> provisioning) {
for (ProcessLauncher<YarnContainerInfo> processLauncher : launchers) {
LOG.info("Got container {}", processLauncher.getContainerInfo().getId());
ProvisionRequest provisionRequest = provisioning.peek();
if (provisionRequest == null) {
continue;
}
String runnableName = provisionRequest.getRuntimeSpec().getName();
LOG.info("Starting runnable {} with {}", runnableName, processLauncher);
LOG.debug("Log level for Twill runnable {} is {}", runnableName, System.getenv(EnvKeys.TWILL_APP_LOG_LEVEL));
int containerCount = expectedContainers.getExpected(runnableName);
// Setup container environment variables
Map<String, String> env = new LinkedHashMap<>();
if (environments.containsKey(runnableName)) {
env.putAll(environments.get(runnableName));
}
// Override with system env
env.put(EnvKeys.TWILL_APP_DIR, System.getenv(EnvKeys.TWILL_APP_DIR));
env.put(EnvKeys.TWILL_FS_USER, System.getenv(EnvKeys.TWILL_FS_USER));
env.put(EnvKeys.TWILL_APP_RUN_ID, runId.getId());
env.put(EnvKeys.TWILL_APP_NAME, twillSpec.getName());
env.put(EnvKeys.TWILL_APP_LOG_LEVEL, System.getenv(EnvKeys.TWILL_APP_LOG_LEVEL));
env.put(EnvKeys.TWILL_ZK_CONNECT, System.getenv(EnvKeys.TWILL_ZK_CONNECT));
env.put(EnvKeys.TWILL_LOG_KAFKA_ZK, getKafkaZKConnect());
ProcessLauncher.PrepareLaunchContext launchContext = processLauncher.prepareLaunch(env, getLocalizeFiles(),
credentials);
TwillContainerLauncher launcher = new TwillContainerLauncher(
twillSpec.getRunnables().get(runnableName), processLauncher.getContainerInfo(), launchContext,
ZKClients.namespace(zkClient, getZKNamespace(runnableName)),
containerCount, jvmOpts, reservedMemory, getSecureStoreLocation());
runningContainers.start(runnableName, processLauncher.getContainerInfo(), launcher);
// Need to call complete to workaround bug in YARN AMRMClient
if (provisionRequest.containerAcquired()) {
amClient.completeContainerRequest(provisionRequest.getRequestId());
}
if (expectedContainers.getExpected(runnableName) == runningContainers.count(runnableName) ||
provisioning.peek().getType().equals(AllocationSpecification.Type.ALLOCATE_ONE_INSTANCE_AT_A_TIME)) {
provisioning.poll();
}
if (expectedContainers.getExpected(runnableName) == runningContainers.count(runnableName)) {
LOG.info("Runnable " + runnableName + " fully provisioned with " + containerCount + " instances.");
}
}
}