in hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java [678:1103]
public boolean run() throws IOException, YarnException {
LOG.info("Running Client");
isRunning.set(true);
yarnClient.start();
// set the client start time.
clientStartTime = System.currentTimeMillis();
YarnClusterMetrics clusterMetrics = yarnClient.getYarnClusterMetrics();
LOG.info("Got Cluster metric info from ASM"
+ ", numNodeManagers=" + clusterMetrics.getNumNodeManagers());
List<NodeReport> clusterNodeReports = yarnClient.getNodeReports(
NodeState.RUNNING);
LOG.info("Got Cluster node info from ASM");
for (NodeReport node : clusterNodeReports) {
LOG.info("Got node report from ASM for"
+ ", nodeId=" + node.getNodeId()
+ ", nodeAddress=" + node.getHttpAddress()
+ ", nodeRackName=" + node.getRackName()
+ ", nodeNumContainers=" + node.getNumContainers());
}
QueueInfo queueInfo = yarnClient.getQueueInfo(this.amQueue);
if (queueInfo == null) {
throw new IllegalArgumentException(String
.format("Queue %s not present in scheduler configuration.",
this.amQueue));
}
LOG.info("Queue info"
+ ", queueName=" + queueInfo.getQueueName()
+ ", queueCurrentCapacity=" + queueInfo.getCurrentCapacity()
+ ", queueMaxCapacity=" + queueInfo.getMaximumCapacity()
+ ", queueApplicationCount=" + queueInfo.getApplications().size()
+ ", queueChildQueueCount=" + queueInfo.getChildQueues().size());
List<QueueUserACLInfo> listAclInfo = yarnClient.getQueueAclsInfo();
for (QueueUserACLInfo aclInfo : listAclInfo) {
for (QueueACL userAcl : aclInfo.getUserAcls()) {
LOG.info("User ACL Info for Queue"
+ ", queueName=" + aclInfo.getQueueName()
+ ", userAcl=" + userAcl.name());
}
}
if (domainId != null && domainId.length() > 0 && toCreateDomain) {
prepareTimelineDomain();
}
Map<String, Resource> profiles;
try {
profiles = yarnClient.getResourceProfiles();
} catch (YARNFeatureNotEnabledException re) {
profiles = null;
}
List<String> appProfiles = new ArrayList<>(2);
appProfiles.add(amResourceProfile);
appProfiles.add(containerResourceProfile);
for (String appProfile : appProfiles) {
if (appProfile != null && !appProfile.isEmpty()) {
if (profiles == null) {
String message = "Resource profiles is not enabled";
LOG.error(message);
throw new IOException(message);
}
if (!profiles.containsKey(appProfile)) {
String message = "Unknown resource profile '" + appProfile
+ "'. Valid resource profiles are " + profiles.keySet();
LOG.error(message);
throw new IOException(message);
}
}
}
// Get a new application id
YarnClientApplication app = yarnClient.createApplication();
GetNewApplicationResponse appResponse = app.getNewApplicationResponse();
// TODO get min/max resource capabilities from RM and change memory ask if needed
// If we do not have min/max, we may not be able to correctly request
// the required resources from the RM for the app master
// Memory ask has to be a multiple of min and less than max.
// Dump out information about cluster capability as seen by the resource manager
long maxMem = appResponse.getMaximumResourceCapability().getMemorySize();
LOG.info("Max mem capability of resources in this cluster " + maxMem);
// A resource ask cannot exceed the max.
if (amMemory > maxMem) {
LOG.info("AM memory specified above max threshold of cluster. Using max value."
+ ", specified=" + amMemory
+ ", max=" + maxMem);
amMemory = maxMem;
}
int maxVCores = appResponse.getMaximumResourceCapability().getVirtualCores();
LOG.info("Max virtual cores capability of resources in this cluster " + maxVCores);
if (amVCores > maxVCores) {
LOG.info("AM virtual cores specified above max threshold of cluster. "
+ "Using max value." + ", specified=" + amVCores
+ ", max=" + maxVCores);
amVCores = maxVCores;
}
// set the application name
ApplicationSubmissionContext appContext = app.getApplicationSubmissionContext();
applicationId = appContext.getApplicationId();
// Set up resource type requirements
// For now, both memory and vcores are supported, so we set memory and
// vcores requirements
List<ResourceTypeInfo> resourceTypes = yarnClient.getResourceTypeInfo();
setAMResourceCapability(appContext, profiles, resourceTypes);
setContainerResources(profiles, resourceTypes);
appContext.setKeepContainersAcrossApplicationAttempts(keepContainers);
appContext.setApplicationName(appName);
if (attemptFailuresValidityInterval >= 0) {
appContext
.setAttemptFailuresValidityInterval(attemptFailuresValidityInterval);
}
Set<String> tags = new HashSet<String>();
if (applicationTags != null) {
tags.addAll(applicationTags);
}
if (flowName != null) {
tags.add(TimelineUtils.generateFlowNameTag(flowName));
}
if (flowVersion != null) {
tags.add(TimelineUtils.generateFlowVersionTag(flowVersion));
}
if (flowRunId != 0) {
tags.add(TimelineUtils.generateFlowRunIdTag(flowRunId));
}
appContext.setApplicationTags(tags);
// set local resources for the application master
// local files or archives as needed
// In this scenario, the jar file for the application master is part of the local resources
Map<String, LocalResource> localResources = new HashMap<String, LocalResource>();
LOG.info("Copy App Master jar from local filesystem and add to local environment");
// Copy the application master jar to the filesystem
// Create a local resource to point to the destination jar path
FileSystem fs = FileSystem.get(conf);
addToLocalResources(fs, appMasterJar, appMasterJarPath,
applicationId.toString(), localResources, null);
// Set the log4j properties if needed
if (!log4jPropFile.isEmpty()) {
addToLocalResources(fs, log4jPropFile, log4jPath,
applicationId.toString(), localResources, null);
}
// Process local files for localization
// Here we just upload the files, the AM
// will set up localization later.
StringBuilder localizableFiles = new StringBuilder();
filesToLocalize.stream().forEach(path -> {
File f = new File(path);
if (!f.exists()) {
throw new UncheckedIOException(
new IOException(path + " does not exist"));
}
if (!f.canRead()) {
throw new UncheckedIOException(
new IOException(path + " cannot be read"));
}
if (f.isDirectory()) {
throw new UncheckedIOException(
new IOException(path + " is a directory"));
}
try {
String fileName = f.getName();
uploadFile(fs, path, fileName, applicationId.toString());
if (localizableFiles.length() == 0) {
localizableFiles.append(fileName);
} else {
localizableFiles.append(",").append(fileName);
}
} catch (IOException e) {
throw new UncheckedIOException("Cannot upload file: " + path, e);
}
});
// The shell script has to be made available on the final container(s)
// where it will be executed.
// To do this, we need to first copy into the filesystem that is visible
// to the yarn framework.
// We do not need to set this as a local resource for the application
// master as the application master does not need it.
String hdfsShellScriptLocation = "";
long hdfsShellScriptLen = 0;
long hdfsShellScriptTimestamp = 0;
if (!shellScriptPath.isEmpty()) {
Path shellSrc = new Path(shellScriptPath);
String shellPathSuffix =
ApplicationMaster.getRelativePath(appName,
applicationId.toString(),
SCRIPT_PATH);
Path shellDst =
new Path(fs.getHomeDirectory(), shellPathSuffix);
fs.copyFromLocalFile(false, true, shellSrc, shellDst);
hdfsShellScriptLocation = shellDst.toUri().toString();
FileStatus shellFileStatus = fs.getFileStatus(shellDst);
hdfsShellScriptLen = shellFileStatus.getLen();
hdfsShellScriptTimestamp = shellFileStatus.getModificationTime();
}
if (!shellCommand.isEmpty()) {
addToLocalResources(fs, null, shellCommandPath, applicationId.toString(),
localResources, shellCommand);
}
if (shellArgs.length > 0) {
addToLocalResources(fs, null, shellArgsPath, applicationId.toString(),
localResources, StringUtils.join(shellArgs, " "));
}
// Set the necessary security tokens as needed
//amContainer.setContainerTokens(containerToken);
// Set the env variables to be setup in the env where the application master will be run
LOG.info("Set the environment for the application master");
Map<String, String> env = new HashMap<String, String>();
// put location of shell script into env
// using the env info, the application master will create the correct local resource for the
// eventual containers that will be launched to execute the shell scripts
env.put(DSConstants.DISTRIBUTEDSHELLSCRIPTLOCATION, hdfsShellScriptLocation);
env.put(DSConstants.DISTRIBUTEDSHELLSCRIPTTIMESTAMP, Long.toString(hdfsShellScriptTimestamp));
env.put(DSConstants.DISTRIBUTEDSHELLSCRIPTLEN, Long.toString(hdfsShellScriptLen));
if (domainId != null && domainId.length() > 0) {
env.put(DSConstants.DISTRIBUTEDSHELLTIMELINEDOMAIN, domainId);
}
// Add AppMaster.jar location to classpath
// At some point we should not be required to add
// the hadoop specific classpaths to the env.
// It should be provided out of the box.
// For now setting all required classpaths including
// the classpath to "." for the application jar
StringBuilder classPathEnv = new StringBuilder(Environment.CLASSPATH.$$())
.append(ApplicationConstants.CLASS_PATH_SEPARATOR).append("./*");
for (String c : conf.getStrings(
YarnConfiguration.YARN_APPLICATION_CLASSPATH,
YarnConfiguration.DEFAULT_YARN_CROSS_PLATFORM_APPLICATION_CLASSPATH)) {
classPathEnv.append(ApplicationConstants.CLASS_PATH_SEPARATOR)
.append(c.trim());
}
classPathEnv.append(ApplicationConstants.CLASS_PATH_SEPARATOR).append(
"./log4j.properties");
// add the runtime classpath needed for tests to work
if (conf.getBoolean(YarnConfiguration.IS_MINI_YARN_CLUSTER, false)) {
classPathEnv.append(ApplicationConstants.CLASS_PATH_SEPARATOR)
.append(System.getProperty("java.class.path"));
}
env.put("CLASSPATH", classPathEnv.toString());
// Set the necessary command to execute the application master
Vector<CharSequence> vargs = new Vector<CharSequence>(30);
// Set java executable command
LOG.info("Setting up app master command");
// Need extra quote here because JAVA_HOME might contain space on Windows,
// e.g. C:/Program Files/Java...
vargs.add("\"" + Environment.JAVA_HOME.$$() + "/bin/java\"");
// Set Xmx based on am memory size
vargs.add("-Xmx" + amMemory + "m");
// JDK17 support
vargs.add(ApplicationConstants.JVM_ADD_OPENS_VAR);
// Set class name
vargs.add(appMasterMainClass);
// Set params for Application Master
if (containerType != null) {
vargs.add("--container_type " + String.valueOf(containerType));
}
if (autoPromoteContainers) {
vargs.add("--promote_opportunistic_after_start");
}
if (enforceExecType) {
vargs.add("--enforce_execution_type");
}
if (containerMemory > 0) {
vargs.add("--container_memory " + String.valueOf(containerMemory));
}
if (containerVirtualCores > 0) {
vargs.add("--container_vcores " + String.valueOf(containerVirtualCores));
}
if (!containerResources.isEmpty()) {
Joiner.MapJoiner joiner = Joiner.on(',').withKeyValueSeparator("=");
vargs.add("--container_resources " + joiner.join(containerResources));
}
if (containerResourceProfile != null && !containerResourceProfile
.isEmpty()) {
vargs.add("--container_resource_profile " + containerResourceProfile);
}
vargs.add("--num_containers " + String.valueOf(numContainers));
if (placementSpec != null && placementSpec.length() > 0) {
// Encode the spec to avoid passing special chars via shell arguments.
String encodedSpec = Base64.getEncoder()
.encodeToString(placementSpec.getBytes(StandardCharsets.UTF_8));
LOG.info("Encode placement spec: " + encodedSpec);
vargs.add("--placement_spec " + encodedSpec);
}
if (null != nodeLabelExpression) {
appContext.setNodeLabelExpression(nodeLabelExpression);
}
vargs.add("--priority " + String.valueOf(shellCmdPriority));
if (keepContainers) {
vargs.add("--keep_containers_across_application_attempts");
}
for (Map.Entry<String, String> entry : shellEnv.entrySet()) {
vargs.add("--shell_env " + entry.getKey() + "=" + entry.getValue());
}
if (debugFlag) {
vargs.add("--debug");
}
if (localizableFiles.length() > 0) {
vargs.add("--localized_files " + localizableFiles.toString());
}
vargs.add("--appname " + appName);
vargs.add("--homedir " + fs.getHomeDirectory());
vargs.addAll(containerRetryOptions);
vargs.add("1>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/AppMaster.stdout");
vargs.add("2>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/AppMaster.stderr");
// Get final commmand
StringBuilder command = new StringBuilder();
for (CharSequence str : vargs) {
command.append(str).append(" ");
}
LOG.info("Completed setting up app master command " + command.toString());
List<String> commands = new ArrayList<String>();
commands.add(command.toString());
// Set up the container launch context for the application master
ContainerLaunchContext amContainer = ContainerLaunchContext.newInstance(
localResources, env, commands, null, null, null);
// Service data is a binary blob that can be passed to the application
// Not needed in this scenario
// amContainer.setServiceData(serviceData);
// Setup security tokens
Credentials rmCredentials = null;
if (UserGroupInformation.isSecurityEnabled()) {
// Note: Credentials class is marked as LimitedPrivate for HDFS and MapReduce
rmCredentials = new Credentials();
String tokenRenewer = YarnClientUtils.getRmPrincipal(conf);
if (tokenRenewer == null || tokenRenewer.length() == 0) {
throw new IOException(
"Can't get Master Kerberos principal for the RM to use as renewer");
}
// For now, only getting tokens for the default file-system.
final Token<?> tokens[] =
fs.addDelegationTokens(tokenRenewer, rmCredentials);
if (tokens != null) {
for (Token<?> token : tokens) {
LOG.info("Got dt for " + fs.getUri() + "; " + token);
}
}
}
// Add the docker client config credentials if supplied.
Credentials dockerCredentials = null;
if (dockerClientConfig != null) {
dockerCredentials =
DockerClientConfigHandler.readCredentialsFromConfigFile(
new Path(dockerClientConfig), conf, applicationId.toString());
}
if (rmCredentials != null || dockerCredentials != null) {
DataOutputBuffer dob = new DataOutputBuffer();
if (rmCredentials != null) {
rmCredentials.writeTokenStorageToStream(dob);
}
if (dockerCredentials != null) {
dockerCredentials.writeTokenStorageToStream(dob);
}
ByteBuffer tokens = ByteBuffer.wrap(dob.getData(), 0, dob.getLength());
amContainer.setTokens(tokens);
}
appContext.setAMContainerSpec(amContainer);
// Set the priority for the application master
// TODO - what is the range for priority? how to decide?
Priority pri = Priority.newInstance(amPriority);
appContext.setPriority(pri);
// Set the queue to which this application is to be submitted in the RM
appContext.setQueue(amQueue);
specifyLogAggregationContext(appContext);
// Submit the application to the applications manager
// SubmitApplicationResponse submitResp = applicationsManager.submitApplication(appRequest);
// Ignore the response as either a valid response object is returned on success
// or an exception thrown to denote some form of a failure
LOG.info("Submitting application to ASM");
yarnClient.submitApplication(appContext);
// TODO
// Try submitting the same request again
// app submission failure?
// Monitor the application
return monitorApplication(applicationId);
}