public static ApplicationSubmissionContext createApplicationSubmissionContext()

in tez-api/src/main/java/org/apache/tez/client/TezClientUtils.java [455:703]


  public static ApplicationSubmissionContext createApplicationSubmissionContext(
      ApplicationId appId, DAG dag, String amName,
      AMConfiguration amConfig, Map<String, LocalResource> tezJarResources,
      Credentials sessionCreds, boolean tezLrsAsArchive,
      TezApiVersionInfo apiVersionInfo,
      ServicePluginsDescriptor servicePluginsDescriptor, JavaOptsChecker javaOptsChecker)
      throws IOException, YarnException {

    Objects.requireNonNull(sessionCreds);
    TezConfiguration conf = amConfig.getTezConfiguration();

    FileSystem fs = TezClientUtils.ensureStagingDirExists(conf,
        TezCommonUtils.getTezBaseStagingPath(conf));
    String strAppId = appId.toString();
    Path tezSysStagingPath = TezCommonUtils.createTezSystemStagingPath(conf, strAppId);
    Path binaryConfPath = TezCommonUtils.getTezConfStagingPath(tezSysStagingPath);
    binaryConfPath = fs.makeQualified(binaryConfPath);

    // Setup resource requirements
    Resource capability = Records.newRecord(Resource.class);
    capability.setMemory(
        amConfig.getTezConfiguration().getInt(TezConfiguration.TEZ_AM_RESOURCE_MEMORY_MB,
            TezConfiguration.TEZ_AM_RESOURCE_MEMORY_MB_DEFAULT));
    capability.setVirtualCores(
        amConfig.getTezConfiguration().getInt(TezConfiguration.TEZ_AM_RESOURCE_CPU_VCORES,
            TezConfiguration.TEZ_AM_RESOURCE_CPU_VCORES_DEFAULT));
    LOG.debug("AppMaster capability = {}", capability);

    // Setup required Credentials for the AM launch. DAG specific credentials
    // are handled separately.
    ByteBuffer securityTokens = null;
    Credentials amLaunchCredentials =
        prepareAmLaunchCredentials(amConfig, sessionCreds, conf, binaryConfPath);

    DataOutputBuffer dob = new DataOutputBuffer();
    amLaunchCredentials.writeTokenStorageToStream(dob);
    securityTokens = ByteBuffer.wrap(dob.getData(), 0, dob.getLength());

    // Setup the command to run the AM
    List<String> vargs = new ArrayList<String>(8);
    vargs.add(Environment.JAVA_HOME.$() + "/bin/java");

    String amOpts = constructAMLaunchOpts(amConfig.getTezConfiguration(), capability);
    vargs.add(amOpts);

    String amLogLevelString = amConfig.getTezConfiguration().get(
        TezConfiguration.TEZ_AM_LOG_LEVEL,
        TezConfiguration.TEZ_AM_LOG_LEVEL_DEFAULT);
    String[] amLogParams = parseLogParams(amLogLevelString);

    String amLogLevel = amLogParams[0];
    TezClientUtils.addLog4jSystemProperties(amLogLevel, vargs);


    // FIX sun bug mentioned in TEZ-327
    vargs.add("-Dsun.nio.ch.bugLevel=''");

    vargs.add(TezConstants.TEZ_APPLICATION_MASTER_CLASS);
    if (dag == null) {
      vargs.add("--" + TezConstants.TEZ_SESSION_MODE_CLI_OPTION);
    }

    vargs.add("1>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR +
        File.separator + ApplicationConstants.STDOUT);
    vargs.add("2>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR +
        File.separator + ApplicationConstants.STDERR);


    Vector<String> vargsFinal = new Vector<String>(8);
    // Final command
    StringBuilder mergedCommand = new StringBuilder();
    for (CharSequence str : vargs) {
      mergedCommand.append(str).append(" ");
    }
    vargsFinal.add(mergedCommand.toString());

    LOG.debug("Command to launch container for ApplicationMaster is : {}", mergedCommand);

    Map<String, String> environment = new TreeMap<String, String>();
    TezYARNUtils.setupDefaultEnv(environment, conf,
        TezConfiguration.TEZ_AM_LAUNCH_ENV,
        TezConfiguration.TEZ_AM_LAUNCH_ENV_DEFAULT,
        TezConfiguration.TEZ_AM_LAUNCH_CLUSTER_DEFAULT_ENV,
        TezConfiguration.TEZ_AM_LAUNCH_CLUSTER_DEFAULT_ENV_DEFAULT,
        tezLrsAsArchive);

    addVersionInfoToEnv(environment, apiVersionInfo);
    addLogParamsToEnv(environment, amLogParams);

    Map<String, LocalResource> amLocalResources =
        new TreeMap<String, LocalResource>();

    // Not fetching credentials for AMLocalResources. Expect this to be provided via AMCredentials.
    if (amConfig.getAMLocalResources() != null) {
      amLocalResources.putAll(amConfig.getAMLocalResources());
    }
    amLocalResources.putAll(tezJarResources);

    TezConfiguration tezConf = amConfig.getTezConfiguration();
    // Merge the dag access controls into tez am config.
    if (dag != null && dag.getDagAccessControls() != null) {
      // Merge updates the conf object passed. In non session mode, same client object can be used
      // to submit multiple dags, copying this prevents ACL of one DAG from being used in another.
      tezConf = new TezConfiguration(amConfig.getTezConfiguration());
      dag.getDagAccessControls().mergeIntoAmAcls(tezConf);
    }

    // emit conf as PB file
    // don't overwrite existing conf, needed for TezClient.getClient() so existing containers have stable resource fingerprints
    if(!binaryConfPath.getFileSystem(tezConf).exists(binaryConfPath)) {
      ConfigurationProto finalConfProto = createFinalConfProtoForApp(tezConf,
              servicePluginsDescriptor);

      FSDataOutputStream amConfPBOutBinaryStream = null;
      try {
        amConfPBOutBinaryStream = TezCommonUtils.createFileForAM(fs, binaryConfPath);
        finalConfProto.writeTo(amConfPBOutBinaryStream);
      } finally {
        if (amConfPBOutBinaryStream != null) {
          amConfPBOutBinaryStream.close();
        }
      }
    }

    LocalResource binaryConfLRsrc =
        TezClientUtils.createLocalResource(fs,
            binaryConfPath, LocalResourceType.FILE,
            LocalResourceVisibility.APPLICATION);
    amConfig.setBinaryConfLR(binaryConfLRsrc);
    amLocalResources.put(TezConstants.TEZ_PB_BINARY_CONF_NAME,
        binaryConfLRsrc);

    // Create Session Jars definition to be sent to AM as a local resource
    Path sessionJarsPath = TezCommonUtils.getTezAMJarStagingPath(tezSysStagingPath);
    FSDataOutputStream sessionJarsPBOutStream = null;
    try {
      sessionJarsPBOutStream = TezCommonUtils.createFileForAM(fs, sessionJarsPath);
      // Write out the initial list of resources which will be available in the AM
      DAGProtos.PlanLocalResourcesProto amResourceProto;
      if (amLocalResources != null && !amLocalResources.isEmpty()) {
        amResourceProto = DagTypeConverters.convertFromLocalResources(amLocalResources);
      } else {
        amResourceProto = DAGProtos.PlanLocalResourcesProto.getDefaultInstance();
      }
      amResourceProto.writeDelimitedTo(sessionJarsPBOutStream);
    } finally {
      if (sessionJarsPBOutStream != null) {
        sessionJarsPBOutStream.close();
      }
    }

    LocalResource sessionJarsPBLRsrc =
      TezClientUtils.createLocalResource(fs,
        sessionJarsPath, LocalResourceType.FILE,
        LocalResourceVisibility.APPLICATION);
    amLocalResources.put(
      TezConstants.TEZ_AM_LOCAL_RESOURCES_PB_FILE_NAME,
      sessionJarsPBLRsrc);

    String user = UserGroupInformation.getCurrentUser().getShortUserName();
    ACLManager aclManager = new ACLManager(user, amConfig.getTezConfiguration());
    Map<ApplicationAccessType, String> acls = aclManager.toYARNACls();

    if(dag != null) {

      DAGPlan dagPB = prepareAndCreateDAGPlan(dag, amConfig, tezJarResources, tezLrsAsArchive,
          sessionCreds, servicePluginsDescriptor, javaOptsChecker);

      // emit protobuf DAG file style
      Path binaryPath = TezCommonUtils.getTezBinPlanStagingPath(tezSysStagingPath);
      if (LOG.isDebugEnabled()) {
        LOG.debug("Stage directory information for AppId :" + appId + " tezSysStagingPath :"
            + tezSysStagingPath + " binaryConfPath :" + binaryConfPath + " sessionJarsPath :"
            + sessionJarsPath + " binaryPlanPath :" + binaryPath);
      }

      FSDataOutputStream dagPBOutBinaryStream = null;

      try {
        //binary output
        dagPBOutBinaryStream = TezCommonUtils.createFileForAM(fs, binaryPath);
        dagPB.writeTo(dagPBOutBinaryStream);
      } finally {
        if(dagPBOutBinaryStream != null){
          dagPBOutBinaryStream.close();
        }
      }

      amLocalResources.put(TezConstants.TEZ_PB_PLAN_BINARY_NAME,
        TezClientUtils.createLocalResource(fs,
          binaryPath, LocalResourceType.FILE,
          LocalResourceVisibility.APPLICATION));

      if (Level.DEBUG.isGreaterOrEqual(Level.toLevel(amLogLevel))) {
        Path textPath = localizeDagPlanAsText(dagPB, fs, amConfig, strAppId, tezSysStagingPath);
        amLocalResources.put(TezConstants.TEZ_PB_PLAN_TEXT_NAME,
            TezClientUtils.createLocalResource(fs,
                textPath, LocalResourceType.FILE,
                LocalResourceVisibility.APPLICATION));
      }
    }

    // Send the shuffle token as part of the AM launch context, so that the NM running the AM can
    // provide this to AuxServices running on the AM node - in case tasks run within the AM,
    // and no other task runs on this node.
    Map<String, ByteBuffer> serviceData = new HashMap<String, ByteBuffer>();
    String auxiliaryService = conf.get(TezConfiguration.TEZ_AM_SHUFFLE_AUXILIARY_SERVICE_ID,
        TezConfiguration.TEZ_AM_SHUFFLE_AUXILIARY_SERVICE_ID_DEFAULT);
    serviceData.put(auxiliaryService,
        TezCommonUtils.serializeServiceData(TokenCache.getSessionToken(amLaunchCredentials)));

    // Setup ContainerLaunchContext for AM container
    ContainerLaunchContext amContainer =
        ContainerLaunchContext.newInstance(amLocalResources, environment,
            vargsFinal, serviceData, securityTokens, acls);

    // Set up the ApplicationSubmissionContext
    ApplicationSubmissionContext appContext = Records
        .newRecord(ApplicationSubmissionContext.class);

    Collection<String> tagsFromConf =
        amConfig.getTezConfiguration().getTrimmedStringCollection(
        TezConfiguration.TEZ_APPLICATION_TAGS);

    appContext.setApplicationType(TezConstants.TEZ_APPLICATION_TYPE);
    if (tagsFromConf != null && !tagsFromConf.isEmpty()) {
      appContext.setApplicationTags(new HashSet<String>(tagsFromConf));
    }
    appContext.setApplicationId(appId);
    appContext.setResource(capability);
    String queueName = amConfig.getQueueName();
    if (queueName != null && !queueName.isEmpty()) {
      appContext.setQueue(amConfig.getQueueName());
    }
    // set the application priority
    setApplicationPriority(appContext, amConfig);
    appContext.setApplicationName(amName);
    appContext.setCancelTokensWhenComplete(amConfig.getTezConfiguration().getBoolean(
        TezConfiguration.TEZ_CANCEL_DELEGATION_TOKENS_ON_COMPLETION,
        TezConfiguration.TEZ_CANCEL_DELEGATION_TOKENS_ON_COMPLETION_DEFAULT));
    appContext.setAMContainerSpec(amContainer);

    appContext.setMaxAppAttempts(
      amConfig.getTezConfiguration().getInt(TezConfiguration.TEZ_AM_MAX_APP_ATTEMPTS,
        TezConfiguration.TEZ_AM_MAX_APP_ATTEMPTS_DEFAULT));

    return appContext;

  }