public boolean init()

in hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java [465:746]


  public boolean init(String[] args) throws ParseException, IOException {
    Options opts = new Options();
    opts.addOption("appname", true,
        "Application Name. Default value - DistributedShell");
    opts.addOption("app_attempt_id", true,
        "App Attempt ID. Not to be used unless for testing purposes");
    opts.addOption("shell_env", true,
        "Environment for shell script. Specified as env_key=env_val pairs");
    opts.addOption("container_type", true,
        "Container execution type, GUARANTEED or OPPORTUNISTIC");
    opts.addOption("promote_opportunistic_after_start", false,
        "Flag to indicate whether to automatically promote opportunistic"
            + " containers to guaranteed.");
    opts.addOption("enforce_execution_type", false,
        "Flag to indicate whether to enforce execution type of containers");
    opts.addOption("container_memory", true,
        "Amount of memory in MB to be requested to run the shell command");
    opts.addOption("container_vcores", true,
        "Amount of virtual cores to be requested to run the shell command");
    opts.addOption("container_resources", true,
        "Amount of resources to be requested to run the shell command. " +
        "Specified as resource type=value pairs separated by commas. " +
        "E.g. -container_resources memory-mb=512,vcores=1");
    opts.addOption("container_resource_profile", true,
        "Resource profile to be requested to run the shell command");
    opts.addOption("num_containers", true,
        "No. of containers on which the shell command needs to be executed");
    opts.addOption("priority", true, "Application Priority. Default 0");
    opts.addOption("container_retry_policy", true,
        "Retry policy when container fails to run, "
            + "0: NEVER_RETRY, 1: RETRY_ON_ALL_ERRORS, "
            + "2: RETRY_ON_SPECIFIC_ERROR_CODES");
    opts.addOption("container_retry_error_codes", true,
        "When retry policy is set to RETRY_ON_SPECIFIC_ERROR_CODES, error "
            + "codes is specified with this option, "
            + "e.g. --container_retry_error_codes 1,2,3");
    opts.addOption("container_max_retries", true,
        "If container could retry, it specifies max retires");
    opts.addOption("container_retry_interval", true,
        "Interval between each retry, unit is milliseconds");
    opts.addOption("container_failures_validity_interval", true,
        "Failures which are out of the time window will not be added to"
            + " the number of container retry attempts");
    opts.addOption("placement_spec", true, "Placement specification");
    opts.addOption("debug", false, "Dump out debug information");
    opts.addOption("keep_containers_across_application_attempts", false,
        "Flag to indicate whether to keep containers across application "
            + "attempts."
            + " If the flag is true, running containers will not be killed when"
            + " application attempt fails and these containers will be "
            + "retrieved by"
            + " the new application attempt ");
    opts.addOption("localized_files", true, "List of localized files");
    opts.addOption("homedir", true, "Home Directory of Job Owner");

    opts.addOption("help", false, "Print usage");
    CommandLine cliParser = new GnuParser().parse(opts, args);

    if (args.length == 0) {
      printUsage(opts);
      throw new IllegalArgumentException(
          "No args specified for application master to initialize");
    }

    //Check whether customer log4j.properties file exists
    if (fileExist(log4jPath)) {
      try {
        Log4jPropertyHelper.updateLog4jConfiguration(ApplicationMaster.class,
            log4jPath);
      } catch (Exception e) {
        LOG.warn("Can not set up custom log4j properties. " + e);
      }
    }

    appName = cliParser.getOptionValue("appname", "DistributedShell");

    if (cliParser.hasOption("help")) {
      printUsage(opts);
      return false;
    }

    if (cliParser.hasOption("debug")) {
      dumpOutDebugInfo();
    }

    homeDirectory = cliParser.hasOption("homedir") ?
        new Path(cliParser.getOptionValue("homedir")) :
        new Path("/user/" + System.getenv(ApplicationConstants.
        Environment.USER.name()));

    if (cliParser.hasOption("placement_spec")) {
      String placementSpec = cliParser.getOptionValue("placement_spec");
      String decodedSpec = getDecodedPlacementSpec(placementSpec);
      LOG.info("Placement Spec received [{}]", decodedSpec);

      this.numTotalContainers = 0;
      int globalNumOfContainers = Integer
          .parseInt(cliParser.getOptionValue("num_containers", "0"));
      parsePlacementSpecs(decodedSpec, globalNumOfContainers);
      LOG.info("Total num containers requested [{}]", numTotalContainers);

      if (numTotalContainers == 0) {
        throw new IllegalArgumentException(
            "Cannot run distributed shell with no containers");
      }
    }

    Map<String, String> envs = System.getenv();

    if (!envs.containsKey(Environment.CONTAINER_ID.name())) {
      if (cliParser.hasOption("app_attempt_id")) {
        String appIdStr = cliParser.getOptionValue("app_attempt_id", "");
        appAttemptID = ApplicationAttemptId.fromString(appIdStr);
      } else {
        throw new IllegalArgumentException(
            "Application Attempt Id not set in the environment");
      }
    } else {
      ContainerId containerId = ContainerId.fromString(envs
          .get(Environment.CONTAINER_ID.name()));
      appAttemptID = containerId.getApplicationAttemptId();
      appId = appAttemptID.getApplicationId();
    }

    if (!envs.containsKey(ApplicationConstants.APP_SUBMIT_TIME_ENV)) {
      throw new RuntimeException(ApplicationConstants.APP_SUBMIT_TIME_ENV
          + " not set in the environment");
    }
    if (!envs.containsKey(Environment.NM_HOST.name())) {
      throw new RuntimeException(Environment.NM_HOST.name()
          + " not set in the environment");
    }
    if (!envs.containsKey(Environment.NM_HTTP_PORT.name())) {
      throw new RuntimeException(Environment.NM_HTTP_PORT
          + " not set in the environment");
    }
    if (!envs.containsKey(Environment.NM_PORT.name())) {
      throw new RuntimeException(Environment.NM_PORT.name()
          + " not set in the environment");
    }

    LOG.info("Application master for app" + ", appId="
        + appAttemptID.getApplicationId().getId() + ", clustertimestamp="
        + appAttemptID.getApplicationId().getClusterTimestamp()
        + ", attemptId=" + appAttemptID.getAttemptId());

    if (!fileExist(shellCommandPath)
        && envs.get(DSConstants.DISTRIBUTEDSHELLSCRIPTLOCATION).isEmpty()) {
      throw new IllegalArgumentException(
          "No shell command or shell script specified to be executed by application master");
    }

    if (fileExist(shellCommandPath)) {
      shellCommand = readContent(shellCommandPath);
    }

    if (fileExist(shellArgsPath)) {
      shellArgs = readContent(shellArgsPath);
    }

    if (cliParser.hasOption("shell_env")) {
      String shellEnvs[] = cliParser.getOptionValues("shell_env");
      for (String env : shellEnvs) {
        env = env.trim();
        int index = env.indexOf('=');
        if (index == -1) {
          shellEnv.put(env, "");
          continue;
        }
        String key = env.substring(0, index);
        String val = "";
        if (index < (env.length() - 1)) {
          val = env.substring(index + 1);
        }
        shellEnv.put(key, val);
      }
    }

    if (envs.containsKey(DSConstants.DISTRIBUTEDSHELLSCRIPTLOCATION)) {
      scriptPath = envs.get(DSConstants.DISTRIBUTEDSHELLSCRIPTLOCATION);

      if (envs.containsKey(DSConstants.DISTRIBUTEDSHELLSCRIPTTIMESTAMP)) {
        shellScriptPathTimestamp = Long.parseLong(envs
            .get(DSConstants.DISTRIBUTEDSHELLSCRIPTTIMESTAMP));
      }
      if (envs.containsKey(DSConstants.DISTRIBUTEDSHELLSCRIPTLEN)) {
        shellScriptPathLen = Long.parseLong(envs
            .get(DSConstants.DISTRIBUTEDSHELLSCRIPTLEN));
      }
      if (!scriptPath.isEmpty()
          && (shellScriptPathTimestamp <= 0 || shellScriptPathLen <= 0)) {
        LOG.error("Illegal values in env for shell script path" + ", path="
            + scriptPath + ", len=" + shellScriptPathLen + ", timestamp="
            + shellScriptPathTimestamp);
        throw new IllegalArgumentException(
            "Illegal values in env for shell script path");
      }
    }

    if (envs.containsKey(DSConstants.DISTRIBUTEDSHELLTIMELINEDOMAIN)) {
      domainId = envs.get(DSConstants.DISTRIBUTEDSHELLTIMELINEDOMAIN);
    }

    if (cliParser.hasOption("container_type")) {
      String containerTypeStr = cliParser.getOptionValue("container_type");
      if (Arrays.stream(ExecutionType.values()).noneMatch(
          executionType -> executionType.toString()
              .equals(containerTypeStr))) {
        throw new IllegalArgumentException("Invalid container_type: "
            + containerTypeStr);
      }
      containerType = ExecutionType.valueOf(containerTypeStr);
    }
    if (cliParser.hasOption("promote_opportunistic_after_start")) {
      autoPromoteContainers = true;
    }
    if (cliParser.hasOption("enforce_execution_type")) {
      enforceExecType = true;
    }
    containerMemory = Integer.parseInt(cliParser.getOptionValue(
        "container_memory", "-1"));
    containerVirtualCores = Integer.parseInt(cliParser.getOptionValue(
        "container_vcores", "-1"));
    containerResources = new HashMap<>();
    if (cliParser.hasOption("container_resources")) {
      Map<String, Long> resources = Client.parseResourcesString(
          cliParser.getOptionValue("container_resources"));
      for (Map.Entry<String, Long> entry : resources.entrySet()) {
        containerResources.put(entry.getKey(), entry.getValue());
      }
    }
    containerResourceProfile =
        cliParser.getOptionValue("container_resource_profile", "");

    keepContainersAcrossAttempts = cliParser.hasOption(
        "keep_containers_across_application_attempts");

    if (this.placementSpecs == null) {
      numTotalContainers = Integer.parseInt(cliParser.getOptionValue(
          "num_containers", "1"));
    }
    if (numTotalContainers == 0) {
      throw new IllegalArgumentException(
          "Cannot run distributed shell with no containers");
    }
    requestPriority = Integer.parseInt(cliParser
        .getOptionValue("priority", "0"));

    containerRetryPolicy = ContainerRetryPolicy.values()[
        Integer.parseInt(cliParser.getOptionValue(
            "container_retry_policy", "0"))];
    if (cliParser.hasOption("container_retry_error_codes")) {
      containerRetryErrorCodes = new HashSet<>();
      for (String errorCode :
          cliParser.getOptionValue("container_retry_error_codes").split(",")) {
        containerRetryErrorCodes.add(Integer.parseInt(errorCode));
      }
    }
    containerMaxRetries = Integer.parseInt(
        cliParser.getOptionValue("container_max_retries", "0"));
    containrRetryInterval = Integer.parseInt(cliParser.getOptionValue(
        "container_retry_interval", "0"));
    containerFailuresValidityInterval = Long.parseLong(
        cliParser.getOptionValue("container_failures_validity_interval", "-1"));
    if (!YarnConfiguration.timelineServiceEnabled(conf)) {
      timelineClient = null;
      timelineV2Client = null;
      LOG.warn("Timeline service is not enabled");
    }

    if (cliParser.hasOption("localized_files")) {
      String localizedFilesArg = cliParser.getOptionValue("localized_files");
      if (localizedFilesArg.contains(",")) {
        String[] files = localizedFilesArg.split(",");
        localizableFiles = Arrays.asList(files);
      } else {
        localizableFiles.add(localizedFilesArg);
      }
    }

    return true;
  }