static ApplicationSubmissionContext createApplicationSubmissionContext()

in tez-api/src/main/java/org/apache/tez/client/TezClientUtils.java [322:592]


  static ApplicationSubmissionContext createApplicationSubmissionContext(
      TezConfiguration conf, ApplicationId appId, DAG dag, String amName,
      AMConfiguration amConfig, Map<String, LocalResource> tezJarResources,
      Credentials sessionCreds)
          throws IOException, YarnException{

    Preconditions.checkNotNull(sessionCreds);

    FileSystem fs = TezClientUtils.ensureStagingDirExists(conf,
        TezCommonUtils.getTezBaseStagingPath(conf));
    String strAppId = appId.toString();
    Path tezSysStagingPath = TezCommonUtils.createTezSystemStagingPath(conf, strAppId);
    Path binaryConfPath = TezCommonUtils.getTezConfStagingPath(tezSysStagingPath);
    binaryConfPath = fs.makeQualified(binaryConfPath);

    // Setup resource requirements
    Resource capability = Records.newRecord(Resource.class);
    capability.setMemory(
        amConfig.getTezConfiguration().getInt(TezConfiguration.TEZ_AM_RESOURCE_MEMORY_MB,
            TezConfiguration.TEZ_AM_RESOURCE_MEMORY_MB_DEFAULT));
    capability.setVirtualCores(
        amConfig.getTezConfiguration().getInt(TezConfiguration.TEZ_AM_RESOURCE_CPU_VCORES,
            TezConfiguration.TEZ_AM_RESOURCE_CPU_VCORES_DEFAULT));
    if (LOG.isDebugEnabled()) {
      LOG.debug("AppMaster capability = " + capability);
    }

    // Setup required Credentials for the AM launch. DAG specific credentials
    // are handled separately.
    ByteBuffer securityTokens = null;
    // Setup security tokens
    Credentials amLaunchCredentials = new Credentials();
    if (amConfig.getCredentials() != null) {
      amLaunchCredentials.addAll(amConfig.getCredentials());
    }

    // Add Staging dir creds to the list of session credentials.
    TokenCache.obtainTokensForFileSystems(sessionCreds, new Path[] {binaryConfPath}, conf);

    // Add session specific credentials to the AM credentials.
    amLaunchCredentials.mergeAll(sessionCreds);

    DataOutputBuffer dob = new DataOutputBuffer();
    amLaunchCredentials.writeTokenStorageToStream(dob);
    securityTokens = ByteBuffer.wrap(dob.getData(), 0, dob.getLength());

    // Need to set credentials based on DAG and the URIs which have been set for the DAG.

    if (dag != null) {
      setupDAGCredentials(dag, sessionCreds, conf);
    }

    // Setup the command to run the AM
    List<String> vargs = new ArrayList<String>(8);
    vargs.add(Environment.JAVA_HOME.$() + "/bin/java");

    String amOpts = amConfig.getTezConfiguration().get(
        TezConfiguration.TEZ_AM_LAUNCH_CMD_OPTS,
        TezConfiguration.TEZ_AM_LAUNCH_CMD_OPTS_DEFAULT);
    amOpts = maybeAddDefaultMemoryJavaOpts(amOpts, capability,
        amConfig.getTezConfiguration().getDouble(TezConfiguration.TEZ_CONTAINER_MAX_JAVA_HEAP_FRACTION,
            TezConfiguration.TEZ_CONTAINER_MAX_JAVA_HEAP_FRACTION_DEFAULT));
    vargs.add(amOpts);

    String amLogLevel = amConfig.getTezConfiguration().get(
        TezConfiguration.TEZ_AM_LOG_LEVEL,
        TezConfiguration.TEZ_AM_LOG_LEVEL_DEFAULT);
    maybeAddDefaultLoggingJavaOpts(amLogLevel, vargs);

    // FIX sun bug mentioned in TEZ-327
    vargs.add("-Dsun.nio.ch.bugLevel=''");

    vargs.add(TezConfiguration.TEZ_APPLICATION_MASTER_CLASS);
    if (dag == null) {
      vargs.add("--" + TezConstants.TEZ_SESSION_MODE_CLI_OPTION);
    }

    vargs.add("1>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR +
        File.separator + ApplicationConstants.STDOUT);
    vargs.add("2>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR +
        File.separator + ApplicationConstants.STDERR);


    Vector<String> vargsFinal = new Vector<String>(8);
    // Final command
    StringBuilder mergedCommand = new StringBuilder();
    for (CharSequence str : vargs) {
      mergedCommand.append(str).append(" ");
    }
    vargsFinal.add(mergedCommand.toString());

    if (LOG.isDebugEnabled()) {
      LOG.debug("Command to launch container for ApplicationMaster is : "
          + mergedCommand);
    }

    Map<String, String> environment = new TreeMap<String, String>();
    TezYARNUtils.setupDefaultEnv(environment, conf, TezConfiguration.TEZ_AM_LAUNCH_ENV,
        TezConfiguration.TEZ_AM_LAUNCH_ENV_DEFAULT);
    
    // finally apply env set in the code. This could potentially be removed in
    // TEZ-692
    if (amConfig.getEnv() != null) {
      for (Map.Entry<String, String> entry : amConfig.getEnv().entrySet()) {
        TezYARNUtils.addToEnvironment(environment, entry.getKey(),
            entry.getValue(), File.pathSeparator);
      }
    }
    
    Map<String, LocalResource> localResources =
        new TreeMap<String, LocalResource>();

    // Not fetching credentials for AMLocalResources. Expect this to be provided via AMCredentials.
    if (amConfig.getLocalResources() != null) {
      localResources.putAll(amConfig.getLocalResources());
    }
    localResources.putAll(tezJarResources);

    // emit conf as PB file
    Configuration finalTezConf = createFinalTezConfForApp(conf,
      amConfig.getTezConfiguration());
    
    FSDataOutputStream amConfPBOutBinaryStream = null;
    try {
      ConfigurationProto.Builder confProtoBuilder =
          ConfigurationProto.newBuilder();
      Iterator<Entry<String, String>> iter = finalTezConf.iterator();
      while (iter.hasNext()) {
        Entry<String, String> entry = iter.next();
        PlanKeyValuePair.Builder kvp = PlanKeyValuePair.newBuilder();
        kvp.setKey(entry.getKey());
        kvp.setValue(entry.getValue());
        confProtoBuilder.addConfKeyValues(kvp);
      }
      //binary output
      amConfPBOutBinaryStream = TezCommonUtils.createFileForAM(fs, binaryConfPath);
      confProtoBuilder.build().writeTo(amConfPBOutBinaryStream);
    } finally {
      if(amConfPBOutBinaryStream != null){
        amConfPBOutBinaryStream.close();
      }
    }

    LocalResource binaryConfLRsrc =
        TezClientUtils.createLocalResource(fs,
            binaryConfPath, LocalResourceType.FILE,
            LocalResourceVisibility.APPLICATION);
    localResources.put(TezConfiguration.TEZ_PB_BINARY_CONF_NAME,
        binaryConfLRsrc);

    // Create Session Jars definition to be sent to AM as a local resource
    Path sessionJarsPath = TezCommonUtils.getTezSessionJarStagingPath(tezSysStagingPath);
    FSDataOutputStream sessionJarsPBOutStream = null;
    try {
      Map<String, LocalResource> sessionJars =
        new HashMap<String, LocalResource>(tezJarResources.size() + 1);
      sessionJars.putAll(tezJarResources);
      sessionJars.put(TezConfiguration.TEZ_PB_BINARY_CONF_NAME,
        binaryConfLRsrc);
      DAGProtos.PlanLocalResourcesProto proto =
        DagTypeConverters.convertFromLocalResources(sessionJars);
      sessionJarsPBOutStream = TezCommonUtils.createFileForAM(fs, sessionJarsPath);
      proto.writeDelimitedTo(sessionJarsPBOutStream);
      
      // Write out the initial list of resources which will be available in the AM
      DAGProtos.PlanLocalResourcesProto amResourceProto;
      if (amConfig.getLocalResources() != null && !amConfig.getLocalResources().isEmpty()) {
        amResourceProto = DagTypeConverters.convertFromLocalResources(localResources);
      } else {
        amResourceProto = DAGProtos.PlanLocalResourcesProto.getDefaultInstance(); 
      }
      amResourceProto.writeDelimitedTo(sessionJarsPBOutStream);
    } finally {
      if (sessionJarsPBOutStream != null) {
        sessionJarsPBOutStream.close();
      }
    }

    LocalResource sessionJarsPBLRsrc =
      TezClientUtils.createLocalResource(fs,
        sessionJarsPath, LocalResourceType.FILE,
        LocalResourceVisibility.APPLICATION);
    localResources.put(
      TezConfiguration.TEZ_SESSION_LOCAL_RESOURCES_PB_FILE_NAME,
      sessionJarsPBLRsrc);

    if(dag != null) {

      for (Vertex v : dag.getVertices()) {
        if (tezJarResources != null) {
          v.getTaskLocalFiles().putAll(tezJarResources);
        }
        v.getTaskLocalFiles().put(TezConfiguration.TEZ_PB_BINARY_CONF_NAME,
            binaryConfLRsrc);

        Map<String, String> taskEnv = v.getTaskEnvironment();
        TezYARNUtils.setupDefaultEnv(taskEnv, conf,
            TezConfiguration.TEZ_TASK_LAUNCH_ENV,
            TezConfiguration.TEZ_TASK_LAUNCH_ENV_DEFAULT);

        TezClientUtils.setDefaultLaunchCmdOpts(v, amConfig.getTezConfiguration());
      }

      // emit protobuf DAG file style
      Path binaryPath = TezCommonUtils.getTezBinPlanStagingPath(tezSysStagingPath);
      if (LOG.isDebugEnabled()) {
        LOG.debug("Stage directory information for AppId :" + appId + " tezSysStagingPath :"
            + tezSysStagingPath + " binaryConfPath :" + binaryConfPath + " sessionJarsPath :"
            + sessionJarsPath + " binaryPlanPath :" + binaryPath);
      }
      amConfig.getTezConfiguration().set(TezConfiguration.TEZ_AM_PLAN_REMOTE_PATH,
          binaryPath.toUri().toString());

      DAGPlan dagPB = dag.createDag(null);

      FSDataOutputStream dagPBOutBinaryStream = null;

      try {
        //binary output
        dagPBOutBinaryStream = TezCommonUtils.createFileForAM(fs, binaryPath);
        dagPB.writeTo(dagPBOutBinaryStream);
      } finally {
        if(dagPBOutBinaryStream != null){
          dagPBOutBinaryStream.close();
        }
      }

      localResources.put(TezConfiguration.TEZ_PB_PLAN_BINARY_NAME,
        TezClientUtils.createLocalResource(fs,
          binaryPath, LocalResourceType.FILE,
          LocalResourceVisibility.APPLICATION));

      if (Level.DEBUG.isGreaterOrEqual(Level.toLevel(amLogLevel))) {
        Path textPath = localizeDagPlanAsText(dagPB, fs, amConfig, strAppId, tezSysStagingPath);
        localResources.put(TezConfiguration.TEZ_PB_PLAN_TEXT_NAME,
            TezClientUtils.createLocalResource(fs,
                textPath, LocalResourceType.FILE,
                LocalResourceVisibility.APPLICATION));
      }
    }
    Map<ApplicationAccessType, String> acls
        = new HashMap<ApplicationAccessType, String>();

    // Setup ContainerLaunchContext for AM container
    ContainerLaunchContext amContainer =
        ContainerLaunchContext.newInstance(localResources, environment,
            vargsFinal, null, securityTokens, acls);

    // Set up the ApplicationSubmissionContext
    ApplicationSubmissionContext appContext = Records
        .newRecord(ApplicationSubmissionContext.class);

    appContext.setApplicationType(TezConfiguration.TEZ_APPLICATION_TYPE);
    appContext.setApplicationId(appId);
    appContext.setResource(capability);
    if (amConfig.getQueueName() != null) {
      appContext.setQueue(amConfig.getQueueName());
    }
    appContext.setApplicationName(amName);
    appContext.setCancelTokensWhenComplete(amConfig.getTezConfiguration().getBoolean(
        TezConfiguration.TEZ_AM_CANCEL_DELEGATION_TOKEN,
        TezConfiguration.TEZ_AM_CANCEL_DELEGATION_TOKEN_DEFAULT));
    appContext.setAMContainerSpec(amContainer);
    
    appContext.setMaxAppAttempts(
      finalTezConf.getInt(TezConfiguration.TEZ_AM_MAX_APP_ATTEMPTS,
        TezConfiguration.TEZ_AM_MAX_APP_ATTEMPTS_DEFAULT));

    return appContext;

  }