in twill-yarn/src/main/hadoop21/org/apache/twill/internal/yarn/Hadoop21YarnAppClient.java [69:112]
public ProcessLauncher<ApplicationMasterInfo> createLauncher(TwillSpecification twillSpec,
@Nullable String schedulerQueue) throws Exception {
// Request for new application
YarnClientApplication application = yarnClient.createApplication();
final GetNewApplicationResponse response = application.getNewApplicationResponse();
final ApplicationId appId = response.getApplicationId();
// Setup the context for application submission
final ApplicationSubmissionContext appSubmissionContext = application.getApplicationSubmissionContext();
appSubmissionContext.setApplicationId(appId);
appSubmissionContext.setApplicationName(twillSpec.getName());
if (schedulerQueue != null) {
appSubmissionContext.setQueue(schedulerQueue);
}
// TODO: Make it adjustable through TwillSpec (TWILL-90)
// Set the resource requirement for AM
final Resource capability = adjustMemory(response, Resource.newInstance(Constants.APP_MASTER_MEMORY_MB, 1));
ApplicationMasterInfo appMasterInfo = new ApplicationMasterInfo(appId, capability.getMemory(),
capability.getVirtualCores());
ApplicationSubmitter submitter = new ApplicationSubmitter() {
@Override
public ProcessController<YarnApplicationReport> submit(YarnLaunchContext context) {
ContainerLaunchContext launchContext = context.getLaunchContext();
addRMToken(launchContext);
appSubmissionContext.setAMContainerSpec(launchContext);
appSubmissionContext.setResource(capability);
appSubmissionContext.setMaxAppAttempts(2);
try {
yarnClient.submitApplication(appSubmissionContext);
return new ProcessControllerImpl(yarnClient, appId);
} catch (Exception e) {
LOG.error("Failed to submit application {}", appId, e);
throw Throwables.propagate(e);
}
}
};
return new ApplicationMasterProcessLauncher(appMasterInfo, submitter);
}