in twill-yarn/src/main/java/org/apache/twill/yarn/YarnTwillPreparer.java [304:378]
public TwillController start() {
try {
final ProcessLauncher<ApplicationMasterInfo> launcher = yarnAppClient.createLauncher(twillSpec, schedulerQueue);
final ApplicationMasterInfo appMasterInfo = launcher.getContainerInfo();
Callable<ProcessController<YarnApplicationReport>> submitTask =
new Callable<ProcessController<YarnApplicationReport>>() {
@Override
public ProcessController<YarnApplicationReport> call() throws Exception {
String fsUser = locationFactory.getHomeLocation().getName();
// Local files needed by AM
Map<String, LocalFile> localFiles = Maps.newHashMap();
// Local files declared by runnables
Multimap<String, LocalFile> runnableLocalFiles = HashMultimap.create();
createAppMasterJar(createBundler(), localFiles);
createContainerJar(createBundler(), localFiles);
populateRunnableLocalFiles(twillSpec, runnableLocalFiles);
saveSpecification(twillSpec, runnableLocalFiles, localFiles);
saveLogback(localFiles);
saveLauncher(localFiles);
saveJvmOptions(localFiles);
saveArguments(new Arguments(arguments, runnableArgs), localFiles);
saveEnvironments(localFiles);
saveLocalFiles(localFiles, ImmutableSet.of(Constants.Files.TWILL_SPEC,
Constants.Files.LOGBACK_TEMPLATE,
Constants.Files.CONTAINER_JAR,
Constants.Files.LAUNCHER_JAR,
Constants.Files.ARGUMENTS));
LOG.debug("Submit AM container spec: {}", appMasterInfo);
// java -Djava.io.tmpdir=tmp -cp launcher.jar:$HADOOP_CONF_DIR -XmxMemory
// org.apache.twill.internal.TwillLauncher
// appMaster.jar
// org.apache.twill.internal.appmaster.ApplicationMasterMain
// false
ImmutableMap.Builder<String, String> builder = ImmutableMap.<String, String>builder()
.put(EnvKeys.TWILL_FS_USER, fsUser)
.put(EnvKeys.TWILL_APP_DIR, getAppLocation().toURI().toASCIIString())
.put(EnvKeys.TWILL_ZK_CONNECT, zkConnectString)
.put(EnvKeys.TWILL_RUN_ID, runId.getId())
.put(EnvKeys.TWILL_RESERVED_MEMORY_MB, Integer.toString(reservedMemory))
.put(EnvKeys.TWILL_APP_NAME, twillSpec.getName())
.put(EnvKeys.YARN_RM_SCHEDULER_ADDRESS, yarnConfig.get(YarnConfiguration.RM_SCHEDULER_ADDRESS));
LOG.debug("Log level is set to {} for the Twill application.", logLevel);
builder.put(EnvKeys.TWILL_APP_LOG_LEVEL, logLevel.toString());
int memory = Resources.computeMaxHeapSize(appMasterInfo.getMemoryMB(),
Constants.APP_MASTER_RESERVED_MEMORY_MB, Constants.HEAP_MIN_RATIO);
return launcher.prepareLaunch(builder.build(), localFiles.values(), credentials)
.addCommand(
"$JAVA_HOME/bin/java",
"-Djava.io.tmpdir=tmp",
"-Dyarn.appId=$" + EnvKeys.YARN_APP_ID_STR,
"-Dtwill.app=$" + EnvKeys.TWILL_APP_NAME,
"-cp", Constants.Files.LAUNCHER_JAR + ":$HADOOP_CONF_DIR",
"-Xmx" + memory + "m",
extraOptions == null ? "" : extraOptions,
TwillLauncher.class.getName(),
Constants.Files.APP_MASTER_JAR,
ApplicationMasterMain.class.getName(),
Boolean.FALSE.toString())
.launch();
}
};
YarnTwillController controller = controllerFactory.create(runId, logHandlers, submitTask);
controller.start();
return controller;
} catch (Exception e) {
LOG.error("Failed to submit application {}", twillSpec.getName(), e);
throw Throwables.propagate(e);
}
}