in tez-mapreduce/src/main/java/org/apache/tez/mapreduce/output/MROutput.java [102:174]
public List<Event> initialize() throws IOException, InterruptedException {
LOG.info("Initializing Simple Output");
getContext().requestInitialMemory(0l, null); //mandatory call
taskNumberFormat.setMinimumIntegerDigits(5);
taskNumberFormat.setGroupingUsed(false);
nonTaskNumberFormat.setMinimumIntegerDigits(3);
nonTaskNumberFormat.setGroupingUsed(false);
Configuration conf = TezUtils.createConfFromUserPayload(
getContext().getUserPayload());
this.jobConf = new JobConf(conf);
// Add tokens to the jobConf - in case they are accessed within the RW / OF
jobConf.getCredentials().mergeAll(UserGroupInformation.getCurrentUser().getCredentials());
this.useNewApi = this.jobConf.getUseNewMapper();
this.isMapperOutput = jobConf.getBoolean(MRConfig.IS_MAP_PROCESSOR,
false);
jobConf.setInt(MRJobConfig.APPLICATION_ATTEMPT_ID,
getContext().getDAGAttemptNumber());
TaskAttemptID taskAttemptId = org.apache.tez.mapreduce.hadoop.mapreduce.TaskAttemptContextImpl
.createMockTaskAttemptID(getContext().getApplicationId().getClusterTimestamp(),
getContext().getTaskVertexIndex(), getContext().getApplicationId().getId(),
getContext().getTaskIndex(), getContext().getTaskAttemptNumber(), isMapperOutput);
jobConf.set(JobContext.TASK_ATTEMPT_ID, taskAttemptId.toString());
jobConf.set(JobContext.TASK_ID, taskAttemptId.getTaskID().toString());
jobConf.setBoolean(JobContext.TASK_ISMAP, isMapperOutput);
jobConf.setInt(JobContext.TASK_PARTITION,
taskAttemptId.getTaskID().getId());
jobConf.set(JobContext.ID, taskAttemptId.getJobID().toString());
if (useNewApi) {
// set the output part name to have a unique prefix
if (jobConf.get("mapreduce.output.basename") == null) {
jobConf.set("mapreduce.output.basename", getOutputFileNamePrefix());
}
}
outputRecordCounter = getContext().getCounters().findCounter(TaskCounter.OUTPUT_RECORDS);
if (useNewApi) {
newApiTaskAttemptContext = createTaskAttemptContext(taskAttemptId);
try {
newOutputFormat =
ReflectionUtils.newInstance(
newApiTaskAttemptContext.getOutputFormatClass(), jobConf);
} catch (ClassNotFoundException cnfe) {
throw new IOException(cnfe);
}
try {
newRecordWriter =
newOutputFormat.getRecordWriter(newApiTaskAttemptContext);
} catch (InterruptedException e) {
throw new IOException("Interrupted while creating record writer", e);
}
} else {
oldApiTaskAttemptContext =
new org.apache.tez.mapreduce.hadoop.mapred.TaskAttemptContextImpl(
jobConf, taskAttemptId,
new MRTaskReporter(getContext()));
oldOutputFormat = jobConf.getOutputFormat();
FileSystem fs = FileSystem.get(jobConf);
String finalName = getOutputName();
oldRecordWriter =
oldOutputFormat.getRecordWriter(
fs, jobConf, finalName, new MRReporter(getContext().getCounters()));
}
initCommitter(jobConf, useNewApi);
LOG.info("Initialized Simple Output"
+ ", using_new_api: " + useNewApi);
return null;
}