public JobStatus submitJob()

in tez-mapreduce/src/main/java/org/apache/tez/mapreduce/client/YARNRunner.java [522:627]


  public JobStatus submitJob(JobID jobId, String jobSubmitDir, Credentials ts)
  throws IOException, InterruptedException {

    // HACK! TEZ-604. Get rid of this once Hive moves all of it's tasks over to Tez native.
    maybeKillSession();
    
    ApplicationId appId = resMgrDelegate.getApplicationId();

    FileSystem fs = FileSystem.get(conf);
    // Loads the job.xml written by the user.
    JobConf jobConf = new JobConf(new TezConfiguration(conf));

    // Extract individual raw MR configs.
    Configuration[] stageConfs = MultiStageMRConfToTezTranslator.getStageConfs(jobConf);

    // Transform all confs to use Tez keys
    for (int i = 0; i < stageConfs.length; i++) {
      MRHelpers.translateVertexConfToTez(stageConfs[i]);
    }

    // create inputs to tezClient.submit()

    // FIXME set up job resources
    Map<String, LocalResource> jobLocalResources =
        createJobLocalResources(stageConfs[0], jobSubmitDir);

    // FIXME createDAG should take the tezConf as a parameter, instead of using
    // MR keys.
    DAG dag = createDAG(fs, jobId, stageConfs, jobSubmitDir, ts,
        jobLocalResources);

    List<String> vargs = new LinkedList<String>();
    // admin command opts and user command opts
    String mrAppMasterAdminOptions = conf.get(MRJobConfig.MR_AM_ADMIN_COMMAND_OPTS,
        MRJobConfig.DEFAULT_MR_AM_ADMIN_COMMAND_OPTS);
    warnForJavaLibPath(mrAppMasterAdminOptions, "app master",
        MRJobConfig.MR_AM_ADMIN_COMMAND_OPTS, MRJobConfig.MR_AM_ADMIN_USER_ENV);
    vargs.add(mrAppMasterAdminOptions);

    // Add AM user command opts
    String mrAppMasterUserOptions = conf.get(MRJobConfig.MR_AM_COMMAND_OPTS,
        MRJobConfig.DEFAULT_MR_AM_COMMAND_OPTS);
    warnForJavaLibPath(mrAppMasterUserOptions, "app master",
        MRJobConfig.MR_AM_COMMAND_OPTS, MRJobConfig.MR_AM_ENV);
    vargs.add(mrAppMasterUserOptions);

    StringBuilder javaOpts = new StringBuilder();
    for (String varg : vargs) {
      javaOpts.append(varg).append(" ");
    }

    // Setup the CLASSPATH in environment
    // i.e. add { Hadoop jars, job jar, CWD } to classpath.
    Map<String, String> environment = new HashMap<String, String>();

    // Setup the environment variables for AM
    MRHelpers.updateEnvironmentForMRAM(conf, environment);
    StringBuilder envStrBuilder = new StringBuilder();
    boolean first = true;
    for (Entry<String, String> entry : environment.entrySet()) {
      if (!first) {
        envStrBuilder.append(",");
      } else {
        first = false;
      }
      envStrBuilder.append(entry.getKey()).append("=").append(entry.getValue());
    }
    String envStr = envStrBuilder.toString();

    TezConfiguration dagAMConf = getDAGAMConfFromMRConf();
    dagAMConf.set(TezConfiguration.TEZ_AM_LAUNCH_CMD_OPTS, javaOpts.toString());
    if (envStr.length() > 0) {
      dagAMConf.set(TezConfiguration.TEZ_AM_LAUNCH_ENV, envStr);
      if (LOG.isDebugEnabled()) {
        LOG.debug("Setting MR AM env to : " + envStr);
      }
    }

    // Submit to ResourceManager
    try {
      dagAMConf.set(TezConfiguration.TEZ_AM_STAGING_DIR,
          jobSubmitDir);
      
      // Set Tez parameters based on MR parameters.
      String queueName = jobConf.get(JobContext.QUEUE_NAME,
          YarnConfiguration.DEFAULT_QUEUE_NAME);
      dagAMConf.set(TezConfiguration.TEZ_QUEUE_NAME, queueName);
      
      int amMemMB = jobConf.getInt(MRJobConfig.MR_AM_VMEM_MB, MRJobConfig.DEFAULT_MR_AM_VMEM_MB);
      int amCores = jobConf.getInt(MRJobConfig.MR_AM_CPU_VCORES, MRJobConfig.DEFAULT_MR_AM_CPU_VCORES);
      dagAMConf.setInt(TezConfiguration.TEZ_AM_RESOURCE_MEMORY_MB, amMemMB);
      dagAMConf.setInt(TezConfiguration.TEZ_AM_RESOURCE_CPU_VCORES, amCores);

      dagAMConf.setInt(TezConfiguration.TEZ_AM_MAX_APP_ATTEMPTS, 
          jobConf.getInt(MRJobConfig.MR_AM_MAX_ATTEMPTS, MRJobConfig.DEFAULT_MR_AM_MAX_ATTEMPTS));
      
      tezSession = new TezClient("MapReduce", dagAMConf, false, jobLocalResources, ts);
      tezSession.start();
      tezSession.submitDAGApplication(appId, dag);
      tezSession.stop();
    } catch (TezException e) {
      throw new IOException(e);
    }

    return getJobStatus(jobId);
  }