in tez-api/src/main/java/org/apache/tez/client/TezClientUtils.java [322:592]
static ApplicationSubmissionContext createApplicationSubmissionContext(
TezConfiguration conf, ApplicationId appId, DAG dag, String amName,
AMConfiguration amConfig, Map<String, LocalResource> tezJarResources,
Credentials sessionCreds)
throws IOException, YarnException{
Preconditions.checkNotNull(sessionCreds);
FileSystem fs = TezClientUtils.ensureStagingDirExists(conf,
TezCommonUtils.getTezBaseStagingPath(conf));
String strAppId = appId.toString();
Path tezSysStagingPath = TezCommonUtils.createTezSystemStagingPath(conf, strAppId);
Path binaryConfPath = TezCommonUtils.getTezConfStagingPath(tezSysStagingPath);
binaryConfPath = fs.makeQualified(binaryConfPath);
// Setup resource requirements
Resource capability = Records.newRecord(Resource.class);
capability.setMemory(
amConfig.getTezConfiguration().getInt(TezConfiguration.TEZ_AM_RESOURCE_MEMORY_MB,
TezConfiguration.TEZ_AM_RESOURCE_MEMORY_MB_DEFAULT));
capability.setVirtualCores(
amConfig.getTezConfiguration().getInt(TezConfiguration.TEZ_AM_RESOURCE_CPU_VCORES,
TezConfiguration.TEZ_AM_RESOURCE_CPU_VCORES_DEFAULT));
if (LOG.isDebugEnabled()) {
LOG.debug("AppMaster capability = " + capability);
}
// Setup required Credentials for the AM launch. DAG specific credentials
// are handled separately.
ByteBuffer securityTokens = null;
// Setup security tokens
Credentials amLaunchCredentials = new Credentials();
if (amConfig.getCredentials() != null) {
amLaunchCredentials.addAll(amConfig.getCredentials());
}
// Add Staging dir creds to the list of session credentials.
TokenCache.obtainTokensForFileSystems(sessionCreds, new Path[] {binaryConfPath}, conf);
// Add session specific credentials to the AM credentials.
amLaunchCredentials.mergeAll(sessionCreds);
DataOutputBuffer dob = new DataOutputBuffer();
amLaunchCredentials.writeTokenStorageToStream(dob);
securityTokens = ByteBuffer.wrap(dob.getData(), 0, dob.getLength());
// Need to set credentials based on DAG and the URIs which have been set for the DAG.
if (dag != null) {
setupDAGCredentials(dag, sessionCreds, conf);
}
// Setup the command to run the AM
List<String> vargs = new ArrayList<String>(8);
vargs.add(Environment.JAVA_HOME.$() + "/bin/java");
String amOpts = amConfig.getTezConfiguration().get(
TezConfiguration.TEZ_AM_LAUNCH_CMD_OPTS,
TezConfiguration.TEZ_AM_LAUNCH_CMD_OPTS_DEFAULT);
amOpts = maybeAddDefaultMemoryJavaOpts(amOpts, capability,
amConfig.getTezConfiguration().getDouble(TezConfiguration.TEZ_CONTAINER_MAX_JAVA_HEAP_FRACTION,
TezConfiguration.TEZ_CONTAINER_MAX_JAVA_HEAP_FRACTION_DEFAULT));
vargs.add(amOpts);
String amLogLevel = amConfig.getTezConfiguration().get(
TezConfiguration.TEZ_AM_LOG_LEVEL,
TezConfiguration.TEZ_AM_LOG_LEVEL_DEFAULT);
maybeAddDefaultLoggingJavaOpts(amLogLevel, vargs);
// FIX sun bug mentioned in TEZ-327
vargs.add("-Dsun.nio.ch.bugLevel=''");
vargs.add(TezConfiguration.TEZ_APPLICATION_MASTER_CLASS);
if (dag == null) {
vargs.add("--" + TezConstants.TEZ_SESSION_MODE_CLI_OPTION);
}
vargs.add("1>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR +
File.separator + ApplicationConstants.STDOUT);
vargs.add("2>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR +
File.separator + ApplicationConstants.STDERR);
Vector<String> vargsFinal = new Vector<String>(8);
// Final command
StringBuilder mergedCommand = new StringBuilder();
for (CharSequence str : vargs) {
mergedCommand.append(str).append(" ");
}
vargsFinal.add(mergedCommand.toString());
if (LOG.isDebugEnabled()) {
LOG.debug("Command to launch container for ApplicationMaster is : "
+ mergedCommand);
}
Map<String, String> environment = new TreeMap<String, String>();
TezYARNUtils.setupDefaultEnv(environment, conf, TezConfiguration.TEZ_AM_LAUNCH_ENV,
TezConfiguration.TEZ_AM_LAUNCH_ENV_DEFAULT);
// finally apply env set in the code. This could potentially be removed in
// TEZ-692
if (amConfig.getEnv() != null) {
for (Map.Entry<String, String> entry : amConfig.getEnv().entrySet()) {
TezYARNUtils.addToEnvironment(environment, entry.getKey(),
entry.getValue(), File.pathSeparator);
}
}
Map<String, LocalResource> localResources =
new TreeMap<String, LocalResource>();
// Not fetching credentials for AMLocalResources. Expect this to be provided via AMCredentials.
if (amConfig.getLocalResources() != null) {
localResources.putAll(amConfig.getLocalResources());
}
localResources.putAll(tezJarResources);
// emit conf as PB file
Configuration finalTezConf = createFinalTezConfForApp(conf,
amConfig.getTezConfiguration());
FSDataOutputStream amConfPBOutBinaryStream = null;
try {
ConfigurationProto.Builder confProtoBuilder =
ConfigurationProto.newBuilder();
Iterator<Entry<String, String>> iter = finalTezConf.iterator();
while (iter.hasNext()) {
Entry<String, String> entry = iter.next();
PlanKeyValuePair.Builder kvp = PlanKeyValuePair.newBuilder();
kvp.setKey(entry.getKey());
kvp.setValue(entry.getValue());
confProtoBuilder.addConfKeyValues(kvp);
}
//binary output
amConfPBOutBinaryStream = TezCommonUtils.createFileForAM(fs, binaryConfPath);
confProtoBuilder.build().writeTo(amConfPBOutBinaryStream);
} finally {
if(amConfPBOutBinaryStream != null){
amConfPBOutBinaryStream.close();
}
}
LocalResource binaryConfLRsrc =
TezClientUtils.createLocalResource(fs,
binaryConfPath, LocalResourceType.FILE,
LocalResourceVisibility.APPLICATION);
localResources.put(TezConfiguration.TEZ_PB_BINARY_CONF_NAME,
binaryConfLRsrc);
// Create Session Jars definition to be sent to AM as a local resource
Path sessionJarsPath = TezCommonUtils.getTezSessionJarStagingPath(tezSysStagingPath);
FSDataOutputStream sessionJarsPBOutStream = null;
try {
Map<String, LocalResource> sessionJars =
new HashMap<String, LocalResource>(tezJarResources.size() + 1);
sessionJars.putAll(tezJarResources);
sessionJars.put(TezConfiguration.TEZ_PB_BINARY_CONF_NAME,
binaryConfLRsrc);
DAGProtos.PlanLocalResourcesProto proto =
DagTypeConverters.convertFromLocalResources(sessionJars);
sessionJarsPBOutStream = TezCommonUtils.createFileForAM(fs, sessionJarsPath);
proto.writeDelimitedTo(sessionJarsPBOutStream);
// Write out the initial list of resources which will be available in the AM
DAGProtos.PlanLocalResourcesProto amResourceProto;
if (amConfig.getLocalResources() != null && !amConfig.getLocalResources().isEmpty()) {
amResourceProto = DagTypeConverters.convertFromLocalResources(localResources);
} else {
amResourceProto = DAGProtos.PlanLocalResourcesProto.getDefaultInstance();
}
amResourceProto.writeDelimitedTo(sessionJarsPBOutStream);
} finally {
if (sessionJarsPBOutStream != null) {
sessionJarsPBOutStream.close();
}
}
LocalResource sessionJarsPBLRsrc =
TezClientUtils.createLocalResource(fs,
sessionJarsPath, LocalResourceType.FILE,
LocalResourceVisibility.APPLICATION);
localResources.put(
TezConfiguration.TEZ_SESSION_LOCAL_RESOURCES_PB_FILE_NAME,
sessionJarsPBLRsrc);
if(dag != null) {
for (Vertex v : dag.getVertices()) {
if (tezJarResources != null) {
v.getTaskLocalFiles().putAll(tezJarResources);
}
v.getTaskLocalFiles().put(TezConfiguration.TEZ_PB_BINARY_CONF_NAME,
binaryConfLRsrc);
Map<String, String> taskEnv = v.getTaskEnvironment();
TezYARNUtils.setupDefaultEnv(taskEnv, conf,
TezConfiguration.TEZ_TASK_LAUNCH_ENV,
TezConfiguration.TEZ_TASK_LAUNCH_ENV_DEFAULT);
TezClientUtils.setDefaultLaunchCmdOpts(v, amConfig.getTezConfiguration());
}
// emit protobuf DAG file style
Path binaryPath = TezCommonUtils.getTezBinPlanStagingPath(tezSysStagingPath);
if (LOG.isDebugEnabled()) {
LOG.debug("Stage directory information for AppId :" + appId + " tezSysStagingPath :"
+ tezSysStagingPath + " binaryConfPath :" + binaryConfPath + " sessionJarsPath :"
+ sessionJarsPath + " binaryPlanPath :" + binaryPath);
}
amConfig.getTezConfiguration().set(TezConfiguration.TEZ_AM_PLAN_REMOTE_PATH,
binaryPath.toUri().toString());
DAGPlan dagPB = dag.createDag(null);
FSDataOutputStream dagPBOutBinaryStream = null;
try {
//binary output
dagPBOutBinaryStream = TezCommonUtils.createFileForAM(fs, binaryPath);
dagPB.writeTo(dagPBOutBinaryStream);
} finally {
if(dagPBOutBinaryStream != null){
dagPBOutBinaryStream.close();
}
}
localResources.put(TezConfiguration.TEZ_PB_PLAN_BINARY_NAME,
TezClientUtils.createLocalResource(fs,
binaryPath, LocalResourceType.FILE,
LocalResourceVisibility.APPLICATION));
if (Level.DEBUG.isGreaterOrEqual(Level.toLevel(amLogLevel))) {
Path textPath = localizeDagPlanAsText(dagPB, fs, amConfig, strAppId, tezSysStagingPath);
localResources.put(TezConfiguration.TEZ_PB_PLAN_TEXT_NAME,
TezClientUtils.createLocalResource(fs,
textPath, LocalResourceType.FILE,
LocalResourceVisibility.APPLICATION));
}
}
Map<ApplicationAccessType, String> acls
= new HashMap<ApplicationAccessType, String>();
// Setup ContainerLaunchContext for AM container
ContainerLaunchContext amContainer =
ContainerLaunchContext.newInstance(localResources, environment,
vargsFinal, null, securityTokens, acls);
// Set up the ApplicationSubmissionContext
ApplicationSubmissionContext appContext = Records
.newRecord(ApplicationSubmissionContext.class);
appContext.setApplicationType(TezConfiguration.TEZ_APPLICATION_TYPE);
appContext.setApplicationId(appId);
appContext.setResource(capability);
if (amConfig.getQueueName() != null) {
appContext.setQueue(amConfig.getQueueName());
}
appContext.setApplicationName(amName);
appContext.setCancelTokensWhenComplete(amConfig.getTezConfiguration().getBoolean(
TezConfiguration.TEZ_AM_CANCEL_DELEGATION_TOKEN,
TezConfiguration.TEZ_AM_CANCEL_DELEGATION_TOKEN_DEFAULT));
appContext.setAMContainerSpec(amContainer);
appContext.setMaxAppAttempts(
finalTezConf.getInt(TezConfiguration.TEZ_AM_MAX_APP_ATTEMPTS,
TezConfiguration.TEZ_AM_MAX_APP_ATTEMPTS_DEFAULT));
return appContext;
}