in tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/Shuffle.java [112:221]
public Shuffle(TezInputContext inputContext, Configuration conf, int numInputs,
long initialMemoryAvailable) throws IOException {
this.inputContext = inputContext;
this.conf = conf;
this.httpConnectionParams =
ShuffleUtils.constructHttpShuffleConnectionParams(conf);
this.metrics = new ShuffleClientMetrics(inputContext.getDAGName(),
inputContext.getTaskVertexName(), inputContext.getTaskIndex(),
this.conf, UserGroupInformation.getCurrentUser().getShortUserName());
this.srcNameTrimmed = TezUtils.cleanVertexName(inputContext.getSourceVertexName());
this.jobTokenSecret = ShuffleUtils
.getJobTokenSecretFromTokenBytes(inputContext
.getServiceConsumerMetaData(TezConfiguration.TEZ_SHUFFLE_HANDLER_SERVICE_ID));
if (ConfigUtils.isIntermediateInputCompressed(conf)) {
Class<? extends CompressionCodec> codecClass =
ConfigUtils.getIntermediateInputCompressorClass(conf, DefaultCodec.class);
codec = ReflectionUtils.newInstance(codecClass, conf);
} else {
codec = null;
}
this.ifileReadAhead = conf.getBoolean(
TezJobConfig.TEZ_RUNTIME_IFILE_READAHEAD,
TezJobConfig.TEZ_RUNTIME_IFILE_READAHEAD_DEFAULT);
if (this.ifileReadAhead) {
this.ifileReadAheadLength = conf.getInt(
TezJobConfig.TEZ_RUNTIME_IFILE_READAHEAD_BYTES,
TezJobConfig.TEZ_RUNTIME_IFILE_READAHEAD_BYTES_DEFAULT);
} else {
this.ifileReadAheadLength = 0;
}
Combiner combiner = TezRuntimeUtils.instantiateCombiner(conf, inputContext);
FileSystem localFS = FileSystem.getLocal(this.conf);
LocalDirAllocator localDirAllocator =
new LocalDirAllocator(TezRuntimeFrameworkConfigs.LOCAL_DIRS);
// TODO TEZ Get rid of Map / Reduce references.
TezCounter shuffledInputsCounter =
inputContext.getCounters().findCounter(TaskCounter.NUM_SHUFFLED_INPUTS);
TezCounter reduceShuffleBytes =
inputContext.getCounters().findCounter(TaskCounter.SHUFFLE_BYTES);
TezCounter reduceDataSizeDecompressed = inputContext.getCounters().findCounter(
TaskCounter.SHUFFLE_BYTES_DECOMPRESSED);
TezCounter failedShuffleCounter =
inputContext.getCounters().findCounter(TaskCounter.NUM_FAILED_SHUFFLE_INPUTS);
TezCounter spilledRecordsCounter =
inputContext.getCounters().findCounter(TaskCounter.SPILLED_RECORDS);
TezCounter reduceCombineInputCounter =
inputContext.getCounters().findCounter(TaskCounter.COMBINE_INPUT_RECORDS);
TezCounter mergedMapOutputsCounter =
inputContext.getCounters().findCounter(TaskCounter.MERGED_MAP_OUTPUTS);
TezCounter bytesShuffedToDisk = inputContext.getCounters().findCounter(
TaskCounter.SHUFFLE_BYTES_TO_DISK);
TezCounter bytesShuffedToMem = inputContext.getCounters().findCounter(
TaskCounter.SHUFFLE_BYTES_TO_MEM);
LOG.info("Shuffle assigned with " + numInputs + " inputs" + ", codec: "
+ (codec == null ? "None" : codec.getClass().getName()) +
"ifileReadAhead: " + ifileReadAhead);
boolean sslShuffle = conf.getBoolean(TezJobConfig.TEZ_RUNTIME_SHUFFLE_ENABLE_SSL,
TezJobConfig.TEZ_RUNTIME_SHUFFLE_ENABLE_SSL_DEFAULT);
scheduler = new ShuffleScheduler(
this.inputContext,
this.conf,
numInputs,
this,
shuffledInputsCounter,
reduceShuffleBytes,
reduceDataSizeDecompressed,
failedShuffleCounter,
bytesShuffedToDisk,
bytesShuffedToMem);
eventHandler= new ShuffleInputEventHandler(
inputContext,
scheduler,
sslShuffle);
merger = new MergeManager(
this.conf,
localFS,
localDirAllocator,
inputContext,
combiner,
spilledRecordsCounter,
reduceCombineInputCounter,
mergedMapOutputsCounter,
this,
initialMemoryAvailable,
codec,
ifileReadAhead,
ifileReadAheadLength);
ExecutorService rawExecutor = Executors.newFixedThreadPool(1, new ThreadFactoryBuilder()
.setDaemon(true).setNameFormat("ShuffleAndMergeRunner [" + srcNameTrimmed + "]").build());
int configuredNumFetchers =
conf.getInt(
TezJobConfig.TEZ_RUNTIME_SHUFFLE_PARALLEL_COPIES,
TezJobConfig.TEZ_RUNTIME_SHUFFLE_PARALLEL_COPIES_DEFAULT);
numFetchers = Math.min(configuredNumFetchers, numInputs);
LOG.info("Num fetchers being started: " + numFetchers);
fetchers = Lists.newArrayListWithCapacity(numFetchers);
executor = MoreExecutors.listeningDecorator(rawExecutor);
runShuffleCallable = new RunShuffleCallable();
}