public List initialize()

in tez-mapreduce/src/main/java/org/apache/tez/mapreduce/output/MROutput.java [102:174]


  public List<Event> initialize() throws IOException, InterruptedException {
    LOG.info("Initializing Simple Output");
    getContext().requestInitialMemory(0l, null); //mandatory call
    taskNumberFormat.setMinimumIntegerDigits(5);
    taskNumberFormat.setGroupingUsed(false);
    nonTaskNumberFormat.setMinimumIntegerDigits(3);
    nonTaskNumberFormat.setGroupingUsed(false);
    Configuration conf = TezUtils.createConfFromUserPayload(
        getContext().getUserPayload());
    this.jobConf = new JobConf(conf);
    // Add tokens to the jobConf - in case they are accessed within the RW / OF
    jobConf.getCredentials().mergeAll(UserGroupInformation.getCurrentUser().getCredentials());
    this.useNewApi = this.jobConf.getUseNewMapper();
    this.isMapperOutput = jobConf.getBoolean(MRConfig.IS_MAP_PROCESSOR,
        false);
    jobConf.setInt(MRJobConfig.APPLICATION_ATTEMPT_ID,
        getContext().getDAGAttemptNumber());
    TaskAttemptID taskAttemptId = org.apache.tez.mapreduce.hadoop.mapreduce.TaskAttemptContextImpl
        .createMockTaskAttemptID(getContext().getApplicationId().getClusterTimestamp(),
            getContext().getTaskVertexIndex(), getContext().getApplicationId().getId(),
            getContext().getTaskIndex(), getContext().getTaskAttemptNumber(), isMapperOutput);
    jobConf.set(JobContext.TASK_ATTEMPT_ID, taskAttemptId.toString());
    jobConf.set(JobContext.TASK_ID, taskAttemptId.getTaskID().toString());
    jobConf.setBoolean(JobContext.TASK_ISMAP, isMapperOutput);
    jobConf.setInt(JobContext.TASK_PARTITION,
      taskAttemptId.getTaskID().getId());
    jobConf.set(JobContext.ID, taskAttemptId.getJobID().toString());
    
    if (useNewApi) {
      // set the output part name to have a unique prefix
      if (jobConf.get("mapreduce.output.basename") == null) {
        jobConf.set("mapreduce.output.basename", getOutputFileNamePrefix());
      }
    }

    outputRecordCounter = getContext().getCounters().findCounter(TaskCounter.OUTPUT_RECORDS);    

    if (useNewApi) {
      newApiTaskAttemptContext = createTaskAttemptContext(taskAttemptId);
      try {
        newOutputFormat =
            ReflectionUtils.newInstance(
                newApiTaskAttemptContext.getOutputFormatClass(), jobConf);
      } catch (ClassNotFoundException cnfe) {
        throw new IOException(cnfe);
      }

      try {
        newRecordWriter =
            newOutputFormat.getRecordWriter(newApiTaskAttemptContext);
      } catch (InterruptedException e) {
        throw new IOException("Interrupted while creating record writer", e);
      }
    } else {
      oldApiTaskAttemptContext =
          new org.apache.tez.mapreduce.hadoop.mapred.TaskAttemptContextImpl(
              jobConf, taskAttemptId,
              new MRTaskReporter(getContext()));
      oldOutputFormat = jobConf.getOutputFormat();

      FileSystem fs = FileSystem.get(jobConf);
      String finalName = getOutputName();

      oldRecordWriter =
          oldOutputFormat.getRecordWriter(
              fs, jobConf, finalName, new MRReporter(getContext().getCounters()));
    }
    initCommitter(jobConf, useNewApi);

    LOG.info("Initialized Simple Output"
        + ", using_new_api: " + useNewApi);
    return null;
  }