public boolean run()

in hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java [678:1103]


  public boolean run() throws IOException, YarnException {
    LOG.info("Running Client");
    isRunning.set(true);
    yarnClient.start();
    // set the client start time.
    clientStartTime = System.currentTimeMillis();

    YarnClusterMetrics clusterMetrics = yarnClient.getYarnClusterMetrics();
    LOG.info("Got Cluster metric info from ASM" 
        + ", numNodeManagers=" + clusterMetrics.getNumNodeManagers());

    List<NodeReport> clusterNodeReports = yarnClient.getNodeReports(
        NodeState.RUNNING);
    LOG.info("Got Cluster node info from ASM");
    for (NodeReport node : clusterNodeReports) {
      LOG.info("Got node report from ASM for"
          + ", nodeId=" + node.getNodeId() 
          + ", nodeAddress=" + node.getHttpAddress()
          + ", nodeRackName=" + node.getRackName()
          + ", nodeNumContainers=" + node.getNumContainers());
    }

    QueueInfo queueInfo = yarnClient.getQueueInfo(this.amQueue);
    if (queueInfo == null) {
      throw new IllegalArgumentException(String
          .format("Queue %s not present in scheduler configuration.",
              this.amQueue));
    }

    LOG.info("Queue info"
        + ", queueName=" + queueInfo.getQueueName()
        + ", queueCurrentCapacity=" + queueInfo.getCurrentCapacity()
        + ", queueMaxCapacity=" + queueInfo.getMaximumCapacity()
        + ", queueApplicationCount=" + queueInfo.getApplications().size()
        + ", queueChildQueueCount=" + queueInfo.getChildQueues().size());

    List<QueueUserACLInfo> listAclInfo = yarnClient.getQueueAclsInfo();
    for (QueueUserACLInfo aclInfo : listAclInfo) {
      for (QueueACL userAcl : aclInfo.getUserAcls()) {
        LOG.info("User ACL Info for Queue"
            + ", queueName=" + aclInfo.getQueueName()
            + ", userAcl=" + userAcl.name());
      }
    }

    if (domainId != null && domainId.length() > 0 && toCreateDomain) {
      prepareTimelineDomain();
    }

    Map<String, Resource> profiles;
    try {
      profiles = yarnClient.getResourceProfiles();
    } catch (YARNFeatureNotEnabledException re) {
      profiles = null;
    }

    List<String> appProfiles = new ArrayList<>(2);
    appProfiles.add(amResourceProfile);
    appProfiles.add(containerResourceProfile);
    for (String appProfile : appProfiles) {
      if (appProfile != null && !appProfile.isEmpty()) {
        if (profiles == null) {
          String message = "Resource profiles is not enabled";
          LOG.error(message);
          throw new IOException(message);
        }
        if (!profiles.containsKey(appProfile)) {
          String message = "Unknown resource profile '" + appProfile
              + "'. Valid resource profiles are " + profiles.keySet();
          LOG.error(message);
          throw new IOException(message);
        }
      }
    }

    // Get a new application id
    YarnClientApplication app = yarnClient.createApplication();
    GetNewApplicationResponse appResponse = app.getNewApplicationResponse();
    // TODO get min/max resource capabilities from RM and change memory ask if needed
    // If we do not have min/max, we may not be able to correctly request 
    // the required resources from the RM for the app master
    // Memory ask has to be a multiple of min and less than max. 
    // Dump out information about cluster capability as seen by the resource manager
    long maxMem = appResponse.getMaximumResourceCapability().getMemorySize();
    LOG.info("Max mem capability of resources in this cluster " + maxMem);

    // A resource ask cannot exceed the max. 
    if (amMemory > maxMem) {
      LOG.info("AM memory specified above max threshold of cluster. Using max value."
          + ", specified=" + amMemory
          + ", max=" + maxMem);
      amMemory = maxMem;
    }

    int maxVCores = appResponse.getMaximumResourceCapability().getVirtualCores();
    LOG.info("Max virtual cores capability of resources in this cluster " + maxVCores);
    
    if (amVCores > maxVCores) {
      LOG.info("AM virtual cores specified above max threshold of cluster. " 
          + "Using max value." + ", specified=" + amVCores 
          + ", max=" + maxVCores);
      amVCores = maxVCores;
    }
    
    // set the application name
    ApplicationSubmissionContext appContext = app.getApplicationSubmissionContext();
    applicationId = appContext.getApplicationId();

    // Set up resource type requirements
    // For now, both memory and vcores are supported, so we set memory and
    // vcores requirements
    List<ResourceTypeInfo> resourceTypes = yarnClient.getResourceTypeInfo();
    setAMResourceCapability(appContext, profiles, resourceTypes);
    setContainerResources(profiles, resourceTypes);

    appContext.setKeepContainersAcrossApplicationAttempts(keepContainers);
    appContext.setApplicationName(appName);

    if (attemptFailuresValidityInterval >= 0) {
      appContext
        .setAttemptFailuresValidityInterval(attemptFailuresValidityInterval);
    }

    Set<String> tags = new HashSet<String>();
    if (applicationTags != null) {
      tags.addAll(applicationTags);
    }
    if (flowName != null) {
      tags.add(TimelineUtils.generateFlowNameTag(flowName));
    }
    if (flowVersion != null) {
      tags.add(TimelineUtils.generateFlowVersionTag(flowVersion));
    }
    if (flowRunId != 0) {
      tags.add(TimelineUtils.generateFlowRunIdTag(flowRunId));
    }
    appContext.setApplicationTags(tags);

    // set local resources for the application master
    // local files or archives as needed
    // In this scenario, the jar file for the application master is part of the local resources
    Map<String, LocalResource> localResources = new HashMap<String, LocalResource>();

    LOG.info("Copy App Master jar from local filesystem and add to local environment");
    // Copy the application master jar to the filesystem 
    // Create a local resource to point to the destination jar path 
    FileSystem fs = FileSystem.get(conf);
    addToLocalResources(fs, appMasterJar, appMasterJarPath,
        applicationId.toString(), localResources, null);

    // Set the log4j properties if needed 
    if (!log4jPropFile.isEmpty()) {
      addToLocalResources(fs, log4jPropFile, log4jPath,
          applicationId.toString(), localResources, null);
    }

    // Process local files for localization
    // Here we just upload the files, the AM
    // will set up localization later.
    StringBuilder localizableFiles = new StringBuilder();
    filesToLocalize.stream().forEach(path -> {
      File f = new File(path);

      if (!f.exists()) {
        throw new UncheckedIOException(
            new IOException(path + " does not exist"));
      }

      if (!f.canRead()) {
        throw new UncheckedIOException(
            new IOException(path + " cannot be read"));
      }

      if (f.isDirectory()) {
        throw new UncheckedIOException(
          new IOException(path + " is a directory"));
      }

      try {
        String fileName = f.getName();
        uploadFile(fs, path, fileName, applicationId.toString());
        if (localizableFiles.length() == 0) {
          localizableFiles.append(fileName);
        } else {
          localizableFiles.append(",").append(fileName);
        }
      } catch (IOException e) {
        throw new UncheckedIOException("Cannot upload file: " + path, e);
      }
    });

    // The shell script has to be made available on the final container(s)
    // where it will be executed. 
    // To do this, we need to first copy into the filesystem that is visible 
    // to the yarn framework. 
    // We do not need to set this as a local resource for the application 
    // master as the application master does not need it.
    String hdfsShellScriptLocation = ""; 
    long hdfsShellScriptLen = 0;
    long hdfsShellScriptTimestamp = 0;
    if (!shellScriptPath.isEmpty()) {
      Path shellSrc = new Path(shellScriptPath);
      String shellPathSuffix =
          ApplicationMaster.getRelativePath(appName,
              applicationId.toString(),
              SCRIPT_PATH);
      Path shellDst =
          new Path(fs.getHomeDirectory(), shellPathSuffix);
      fs.copyFromLocalFile(false, true, shellSrc, shellDst);
      hdfsShellScriptLocation = shellDst.toUri().toString(); 
      FileStatus shellFileStatus = fs.getFileStatus(shellDst);
      hdfsShellScriptLen = shellFileStatus.getLen();
      hdfsShellScriptTimestamp = shellFileStatus.getModificationTime();
    }

    if (!shellCommand.isEmpty()) {
      addToLocalResources(fs, null, shellCommandPath, applicationId.toString(),
          localResources, shellCommand);
    }

    if (shellArgs.length > 0) {
      addToLocalResources(fs, null, shellArgsPath, applicationId.toString(),
          localResources, StringUtils.join(shellArgs, " "));
    }

    // Set the necessary security tokens as needed
    //amContainer.setContainerTokens(containerToken);

    // Set the env variables to be setup in the env where the application master will be run
    LOG.info("Set the environment for the application master");
    Map<String, String> env = new HashMap<String, String>();

    // put location of shell script into env
    // using the env info, the application master will create the correct local resource for the 
    // eventual containers that will be launched to execute the shell scripts
    env.put(DSConstants.DISTRIBUTEDSHELLSCRIPTLOCATION, hdfsShellScriptLocation);
    env.put(DSConstants.DISTRIBUTEDSHELLSCRIPTTIMESTAMP, Long.toString(hdfsShellScriptTimestamp));
    env.put(DSConstants.DISTRIBUTEDSHELLSCRIPTLEN, Long.toString(hdfsShellScriptLen));
    if (domainId != null && domainId.length() > 0) {
      env.put(DSConstants.DISTRIBUTEDSHELLTIMELINEDOMAIN, domainId);
    }

    // Add AppMaster.jar location to classpath
    // At some point we should not be required to add 
    // the hadoop specific classpaths to the env. 
    // It should be provided out of the box. 
    // For now setting all required classpaths including
    // the classpath to "." for the application jar
    StringBuilder classPathEnv = new StringBuilder(Environment.CLASSPATH.$$())
      .append(ApplicationConstants.CLASS_PATH_SEPARATOR).append("./*");
    for (String c : conf.getStrings(
        YarnConfiguration.YARN_APPLICATION_CLASSPATH,
        YarnConfiguration.DEFAULT_YARN_CROSS_PLATFORM_APPLICATION_CLASSPATH)) {
      classPathEnv.append(ApplicationConstants.CLASS_PATH_SEPARATOR)
          .append(c.trim());
    }
    classPathEnv.append(ApplicationConstants.CLASS_PATH_SEPARATOR).append(
      "./log4j.properties");

    // add the runtime classpath needed for tests to work
    if (conf.getBoolean(YarnConfiguration.IS_MINI_YARN_CLUSTER, false)) {
      classPathEnv.append(ApplicationConstants.CLASS_PATH_SEPARATOR)
          .append(System.getProperty("java.class.path"));
    }

    env.put("CLASSPATH", classPathEnv.toString());

    // Set the necessary command to execute the application master 
    Vector<CharSequence> vargs = new Vector<CharSequence>(30);

    // Set java executable command 
    LOG.info("Setting up app master command");
    // Need extra quote here because JAVA_HOME might contain space on Windows,
    // e.g. C:/Program Files/Java...
    vargs.add("\"" + Environment.JAVA_HOME.$$() + "/bin/java\"");
    // Set Xmx based on am memory size
    vargs.add("-Xmx" + amMemory + "m");
    // JDK17 support
    vargs.add(ApplicationConstants.JVM_ADD_OPENS_VAR);
    // Set class name 
    vargs.add(appMasterMainClass);
    // Set params for Application Master
    if (containerType != null) {
      vargs.add("--container_type " + String.valueOf(containerType));
    }
    if (autoPromoteContainers) {
      vargs.add("--promote_opportunistic_after_start");
    }
    if (enforceExecType) {
      vargs.add("--enforce_execution_type");
    }
    if (containerMemory > 0) {
      vargs.add("--container_memory " + String.valueOf(containerMemory));
    }
    if (containerVirtualCores > 0) {
      vargs.add("--container_vcores " + String.valueOf(containerVirtualCores));
    }
    if (!containerResources.isEmpty()) {
      Joiner.MapJoiner joiner = Joiner.on(',').withKeyValueSeparator("=");
      vargs.add("--container_resources " + joiner.join(containerResources));
    }
    if (containerResourceProfile != null && !containerResourceProfile
        .isEmpty()) {
      vargs.add("--container_resource_profile " + containerResourceProfile);
    }
    vargs.add("--num_containers " + String.valueOf(numContainers));
    if (placementSpec != null && placementSpec.length() > 0) {
      // Encode the spec to avoid passing special chars via shell arguments.
      String encodedSpec = Base64.getEncoder()
          .encodeToString(placementSpec.getBytes(StandardCharsets.UTF_8));
      LOG.info("Encode placement spec: " + encodedSpec);
      vargs.add("--placement_spec " + encodedSpec);
    }
    if (null != nodeLabelExpression) {
      appContext.setNodeLabelExpression(nodeLabelExpression);
    }
    vargs.add("--priority " + String.valueOf(shellCmdPriority));

    if (keepContainers) {
      vargs.add("--keep_containers_across_application_attempts");
    }
    for (Map.Entry<String, String> entry : shellEnv.entrySet()) {
      vargs.add("--shell_env " + entry.getKey() + "=" + entry.getValue());
    }
    if (debugFlag) {
      vargs.add("--debug");
    }
    if (localizableFiles.length() > 0) {
      vargs.add("--localized_files " + localizableFiles.toString());
    }
    vargs.add("--appname " + appName);

    vargs.add("--homedir " + fs.getHomeDirectory());

    vargs.addAll(containerRetryOptions);

    vargs.add("1>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/AppMaster.stdout");
    vargs.add("2>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/AppMaster.stderr");

    // Get final commmand
    StringBuilder command = new StringBuilder();
    for (CharSequence str : vargs) {
      command.append(str).append(" ");
    }

    LOG.info("Completed setting up app master command " + command.toString());
    List<String> commands = new ArrayList<String>();
    commands.add(command.toString());

    // Set up the container launch context for the application master
    ContainerLaunchContext amContainer = ContainerLaunchContext.newInstance(
      localResources, env, commands, null, null, null);

    // Service data is a binary blob that can be passed to the application
    // Not needed in this scenario
    // amContainer.setServiceData(serviceData);

    // Setup security tokens
    Credentials rmCredentials = null;
    if (UserGroupInformation.isSecurityEnabled()) {
      // Note: Credentials class is marked as LimitedPrivate for HDFS and MapReduce
      rmCredentials = new Credentials();
      String tokenRenewer = YarnClientUtils.getRmPrincipal(conf);
      if (tokenRenewer == null || tokenRenewer.length() == 0) {
        throw new IOException(
          "Can't get Master Kerberos principal for the RM to use as renewer");
      }

      // For now, only getting tokens for the default file-system.
      final Token<?> tokens[] =
          fs.addDelegationTokens(tokenRenewer, rmCredentials);
      if (tokens != null) {
        for (Token<?> token : tokens) {
          LOG.info("Got dt for " + fs.getUri() + "; " + token);
        }
      }
    }

    // Add the docker client config credentials if supplied.
    Credentials dockerCredentials = null;
    if (dockerClientConfig != null) {
      dockerCredentials =
          DockerClientConfigHandler.readCredentialsFromConfigFile(
              new Path(dockerClientConfig), conf, applicationId.toString());
    }

    if (rmCredentials != null || dockerCredentials != null) {
      DataOutputBuffer dob = new DataOutputBuffer();
      if (rmCredentials != null) {
        rmCredentials.writeTokenStorageToStream(dob);
      }
      if (dockerCredentials != null) {
        dockerCredentials.writeTokenStorageToStream(dob);
      }
      ByteBuffer tokens = ByteBuffer.wrap(dob.getData(), 0, dob.getLength());
      amContainer.setTokens(tokens);
    }

    appContext.setAMContainerSpec(amContainer);

    // Set the priority for the application master
    // TODO - what is the range for priority? how to decide? 
    Priority pri = Priority.newInstance(amPriority);
    appContext.setPriority(pri);

    // Set the queue to which this application is to be submitted in the RM
    appContext.setQueue(amQueue);

    specifyLogAggregationContext(appContext);

    // Submit the application to the applications manager
    // SubmitApplicationResponse submitResp = applicationsManager.submitApplication(appRequest);
    // Ignore the response as either a valid response object is returned on success 
    // or an exception thrown to denote some form of a failure
    LOG.info("Submitting application to ASM");

    yarnClient.submitApplication(appContext);

    // TODO
    // Try submitting the same request again
    // app submission failure?

    // Monitor the application
    return monitorApplication(applicationId);

  }