in client-tez/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/RssShuffle.java [103:213]
public RssShuffle(
InputContext inputContext,
Configuration conf,
int numInputs,
long initialMemoryAvailable,
int shuffleId,
ApplicationAttemptId applicationAttemptId)
throws IOException {
this.inputContext = inputContext;
this.conf = conf;
this.srcNameTrimmed = TezUtilsInternal.cleanVertexName(inputContext.getSourceVertexName());
if (ConfigUtils.isIntermediateInputCompressed(conf)) {
Class<? extends CompressionCodec> codecClass =
ConfigUtils.getIntermediateInputCompressorClass(conf, DefaultCodec.class);
codec = ReflectionUtils.newInstance(codecClass, conf);
// Work around needed for HADOOP-12191. Avoids the native initialization synchronization race
codec.getDecompressorType();
} else {
codec = null;
}
this.ifileReadAhead =
conf.getBoolean(
TezRuntimeConfiguration.TEZ_RUNTIME_IFILE_READAHEAD,
TezRuntimeConfiguration.TEZ_RUNTIME_IFILE_READAHEAD_DEFAULT);
if (this.ifileReadAhead) {
this.ifileReadAheadLength =
conf.getInt(
TezRuntimeConfiguration.TEZ_RUNTIME_IFILE_READAHEAD_BYTES,
TezRuntimeConfiguration.TEZ_RUNTIME_IFILE_READAHEAD_BYTES_DEFAULT);
} else {
this.ifileReadAheadLength = 0;
}
Combiner combiner = TezRuntimeUtils.instantiateCombiner(conf, inputContext);
FileSystem localFS = FileSystem.getLocal(this.conf);
LocalDirAllocator localDirAllocator =
new LocalDirAllocator(TezRuntimeFrameworkConfigs.LOCAL_DIRS);
// TODO TEZ Get rid of Map / Reduce references.
TezCounter spilledRecordsCounter =
inputContext.getCounters().findCounter(TaskCounter.SPILLED_RECORDS);
TezCounter reduceCombineInputCounter =
inputContext.getCounters().findCounter(TaskCounter.COMBINE_INPUT_RECORDS);
TezCounter mergedMapOutputsCounter =
inputContext.getCounters().findCounter(TaskCounter.MERGED_MAP_OUTPUTS);
LOG.info(
srcNameTrimmed
+ ": "
+ "Shuffle assigned with "
+ numInputs
+ " inputs"
+ ", codec: "
+ (codec == null ? "None" : codec.getClass().getName())
+ ", ifileReadAhead: "
+ ifileReadAhead);
startTime = System.currentTimeMillis();
merger =
new MergeManager(
this.conf,
localFS,
localDirAllocator,
inputContext,
combiner,
spilledRecordsCounter,
reduceCombineInputCounter,
mergedMapOutputsCounter,
this,
initialMemoryAvailable,
codec,
ifileReadAhead,
ifileReadAheadLength);
rssScheduler =
new RssShuffleScheduler(
this.inputContext,
this.conf,
numInputs,
this,
merger,
merger,
startTime,
codec,
ifileReadAhead,
ifileReadAheadLength,
srcNameTrimmed,
shuffleId,
applicationAttemptId);
this.mergePhaseTime = inputContext.getCounters().findCounter(TaskCounter.MERGE_PHASE_TIME);
this.shufflePhaseTime = inputContext.getCounters().findCounter(TaskCounter.SHUFFLE_PHASE_TIME);
eventHandler =
new ShuffleInputEventHandlerOrderedGrouped(
inputContext, rssScheduler, ShuffleUtils.isTezShuffleHandler(conf));
ExecutorService rawExecutor =
Executors.newFixedThreadPool(
1,
new ThreadFactoryBuilder()
.setDaemon(true)
.setNameFormat("ShuffleAndMergeRunner {" + srcNameTrimmed + "}")
.build());
executor = MoreExecutors.listeningDecorator(rawExecutor);
rssRunShuffleCallable = new RssRunShuffleCallable();
}