in hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java [465:746]
public boolean init(String[] args) throws ParseException, IOException {
Options opts = new Options();
opts.addOption("appname", true,
"Application Name. Default value - DistributedShell");
opts.addOption("app_attempt_id", true,
"App Attempt ID. Not to be used unless for testing purposes");
opts.addOption("shell_env", true,
"Environment for shell script. Specified as env_key=env_val pairs");
opts.addOption("container_type", true,
"Container execution type, GUARANTEED or OPPORTUNISTIC");
opts.addOption("promote_opportunistic_after_start", false,
"Flag to indicate whether to automatically promote opportunistic"
+ " containers to guaranteed.");
opts.addOption("enforce_execution_type", false,
"Flag to indicate whether to enforce execution type of containers");
opts.addOption("container_memory", true,
"Amount of memory in MB to be requested to run the shell command");
opts.addOption("container_vcores", true,
"Amount of virtual cores to be requested to run the shell command");
opts.addOption("container_resources", true,
"Amount of resources to be requested to run the shell command. " +
"Specified as resource type=value pairs separated by commas. " +
"E.g. -container_resources memory-mb=512,vcores=1");
opts.addOption("container_resource_profile", true,
"Resource profile to be requested to run the shell command");
opts.addOption("num_containers", true,
"No. of containers on which the shell command needs to be executed");
opts.addOption("priority", true, "Application Priority. Default 0");
opts.addOption("container_retry_policy", true,
"Retry policy when container fails to run, "
+ "0: NEVER_RETRY, 1: RETRY_ON_ALL_ERRORS, "
+ "2: RETRY_ON_SPECIFIC_ERROR_CODES");
opts.addOption("container_retry_error_codes", true,
"When retry policy is set to RETRY_ON_SPECIFIC_ERROR_CODES, error "
+ "codes is specified with this option, "
+ "e.g. --container_retry_error_codes 1,2,3");
opts.addOption("container_max_retries", true,
"If container could retry, it specifies max retires");
opts.addOption("container_retry_interval", true,
"Interval between each retry, unit is milliseconds");
opts.addOption("container_failures_validity_interval", true,
"Failures which are out of the time window will not be added to"
+ " the number of container retry attempts");
opts.addOption("placement_spec", true, "Placement specification");
opts.addOption("debug", false, "Dump out debug information");
opts.addOption("keep_containers_across_application_attempts", false,
"Flag to indicate whether to keep containers across application "
+ "attempts."
+ " If the flag is true, running containers will not be killed when"
+ " application attempt fails and these containers will be "
+ "retrieved by"
+ " the new application attempt ");
opts.addOption("localized_files", true, "List of localized files");
opts.addOption("homedir", true, "Home Directory of Job Owner");
opts.addOption("help", false, "Print usage");
CommandLine cliParser = new GnuParser().parse(opts, args);
if (args.length == 0) {
printUsage(opts);
throw new IllegalArgumentException(
"No args specified for application master to initialize");
}
//Check whether customer log4j.properties file exists
if (fileExist(log4jPath)) {
try {
Log4jPropertyHelper.updateLog4jConfiguration(ApplicationMaster.class,
log4jPath);
} catch (Exception e) {
LOG.warn("Can not set up custom log4j properties. " + e);
}
}
appName = cliParser.getOptionValue("appname", "DistributedShell");
if (cliParser.hasOption("help")) {
printUsage(opts);
return false;
}
if (cliParser.hasOption("debug")) {
dumpOutDebugInfo();
}
homeDirectory = cliParser.hasOption("homedir") ?
new Path(cliParser.getOptionValue("homedir")) :
new Path("/user/" + System.getenv(ApplicationConstants.
Environment.USER.name()));
if (cliParser.hasOption("placement_spec")) {
String placementSpec = cliParser.getOptionValue("placement_spec");
String decodedSpec = getDecodedPlacementSpec(placementSpec);
LOG.info("Placement Spec received [{}]", decodedSpec);
this.numTotalContainers = 0;
int globalNumOfContainers = Integer
.parseInt(cliParser.getOptionValue("num_containers", "0"));
parsePlacementSpecs(decodedSpec, globalNumOfContainers);
LOG.info("Total num containers requested [{}]", numTotalContainers);
if (numTotalContainers == 0) {
throw new IllegalArgumentException(
"Cannot run distributed shell with no containers");
}
}
Map<String, String> envs = System.getenv();
if (!envs.containsKey(Environment.CONTAINER_ID.name())) {
if (cliParser.hasOption("app_attempt_id")) {
String appIdStr = cliParser.getOptionValue("app_attempt_id", "");
appAttemptID = ApplicationAttemptId.fromString(appIdStr);
} else {
throw new IllegalArgumentException(
"Application Attempt Id not set in the environment");
}
} else {
ContainerId containerId = ContainerId.fromString(envs
.get(Environment.CONTAINER_ID.name()));
appAttemptID = containerId.getApplicationAttemptId();
appId = appAttemptID.getApplicationId();
}
if (!envs.containsKey(ApplicationConstants.APP_SUBMIT_TIME_ENV)) {
throw new RuntimeException(ApplicationConstants.APP_SUBMIT_TIME_ENV
+ " not set in the environment");
}
if (!envs.containsKey(Environment.NM_HOST.name())) {
throw new RuntimeException(Environment.NM_HOST.name()
+ " not set in the environment");
}
if (!envs.containsKey(Environment.NM_HTTP_PORT.name())) {
throw new RuntimeException(Environment.NM_HTTP_PORT
+ " not set in the environment");
}
if (!envs.containsKey(Environment.NM_PORT.name())) {
throw new RuntimeException(Environment.NM_PORT.name()
+ " not set in the environment");
}
LOG.info("Application master for app" + ", appId="
+ appAttemptID.getApplicationId().getId() + ", clustertimestamp="
+ appAttemptID.getApplicationId().getClusterTimestamp()
+ ", attemptId=" + appAttemptID.getAttemptId());
if (!fileExist(shellCommandPath)
&& envs.get(DSConstants.DISTRIBUTEDSHELLSCRIPTLOCATION).isEmpty()) {
throw new IllegalArgumentException(
"No shell command or shell script specified to be executed by application master");
}
if (fileExist(shellCommandPath)) {
shellCommand = readContent(shellCommandPath);
}
if (fileExist(shellArgsPath)) {
shellArgs = readContent(shellArgsPath);
}
if (cliParser.hasOption("shell_env")) {
String shellEnvs[] = cliParser.getOptionValues("shell_env");
for (String env : shellEnvs) {
env = env.trim();
int index = env.indexOf('=');
if (index == -1) {
shellEnv.put(env, "");
continue;
}
String key = env.substring(0, index);
String val = "";
if (index < (env.length() - 1)) {
val = env.substring(index + 1);
}
shellEnv.put(key, val);
}
}
if (envs.containsKey(DSConstants.DISTRIBUTEDSHELLSCRIPTLOCATION)) {
scriptPath = envs.get(DSConstants.DISTRIBUTEDSHELLSCRIPTLOCATION);
if (envs.containsKey(DSConstants.DISTRIBUTEDSHELLSCRIPTTIMESTAMP)) {
shellScriptPathTimestamp = Long.parseLong(envs
.get(DSConstants.DISTRIBUTEDSHELLSCRIPTTIMESTAMP));
}
if (envs.containsKey(DSConstants.DISTRIBUTEDSHELLSCRIPTLEN)) {
shellScriptPathLen = Long.parseLong(envs
.get(DSConstants.DISTRIBUTEDSHELLSCRIPTLEN));
}
if (!scriptPath.isEmpty()
&& (shellScriptPathTimestamp <= 0 || shellScriptPathLen <= 0)) {
LOG.error("Illegal values in env for shell script path" + ", path="
+ scriptPath + ", len=" + shellScriptPathLen + ", timestamp="
+ shellScriptPathTimestamp);
throw new IllegalArgumentException(
"Illegal values in env for shell script path");
}
}
if (envs.containsKey(DSConstants.DISTRIBUTEDSHELLTIMELINEDOMAIN)) {
domainId = envs.get(DSConstants.DISTRIBUTEDSHELLTIMELINEDOMAIN);
}
if (cliParser.hasOption("container_type")) {
String containerTypeStr = cliParser.getOptionValue("container_type");
if (Arrays.stream(ExecutionType.values()).noneMatch(
executionType -> executionType.toString()
.equals(containerTypeStr))) {
throw new IllegalArgumentException("Invalid container_type: "
+ containerTypeStr);
}
containerType = ExecutionType.valueOf(containerTypeStr);
}
if (cliParser.hasOption("promote_opportunistic_after_start")) {
autoPromoteContainers = true;
}
if (cliParser.hasOption("enforce_execution_type")) {
enforceExecType = true;
}
containerMemory = Integer.parseInt(cliParser.getOptionValue(
"container_memory", "-1"));
containerVirtualCores = Integer.parseInt(cliParser.getOptionValue(
"container_vcores", "-1"));
containerResources = new HashMap<>();
if (cliParser.hasOption("container_resources")) {
Map<String, Long> resources = Client.parseResourcesString(
cliParser.getOptionValue("container_resources"));
for (Map.Entry<String, Long> entry : resources.entrySet()) {
containerResources.put(entry.getKey(), entry.getValue());
}
}
containerResourceProfile =
cliParser.getOptionValue("container_resource_profile", "");
keepContainersAcrossAttempts = cliParser.hasOption(
"keep_containers_across_application_attempts");
if (this.placementSpecs == null) {
numTotalContainers = Integer.parseInt(cliParser.getOptionValue(
"num_containers", "1"));
}
if (numTotalContainers == 0) {
throw new IllegalArgumentException(
"Cannot run distributed shell with no containers");
}
requestPriority = Integer.parseInt(cliParser
.getOptionValue("priority", "0"));
containerRetryPolicy = ContainerRetryPolicy.values()[
Integer.parseInt(cliParser.getOptionValue(
"container_retry_policy", "0"))];
if (cliParser.hasOption("container_retry_error_codes")) {
containerRetryErrorCodes = new HashSet<>();
for (String errorCode :
cliParser.getOptionValue("container_retry_error_codes").split(",")) {
containerRetryErrorCodes.add(Integer.parseInt(errorCode));
}
}
containerMaxRetries = Integer.parseInt(
cliParser.getOptionValue("container_max_retries", "0"));
containrRetryInterval = Integer.parseInt(cliParser.getOptionValue(
"container_retry_interval", "0"));
containerFailuresValidityInterval = Long.parseLong(
cliParser.getOptionValue("container_failures_validity_interval", "-1"));
if (!YarnConfiguration.timelineServiceEnabled(conf)) {
timelineClient = null;
timelineV2Client = null;
LOG.warn("Timeline service is not enabled");
}
if (cliParser.hasOption("localized_files")) {
String localizedFilesArg = cliParser.getOptionValue("localized_files");
if (localizedFilesArg.contains(",")) {
String[] files = localizedFilesArg.split(",");
localizableFiles = Arrays.asList(files);
} else {
localizableFiles.add(localizedFilesArg);
}
}
return true;
}