in tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/MergeManager.java [133:273]
public MergeManager(Configuration conf,
FileSystem localFS,
LocalDirAllocator localDirAllocator,
TezInputContext inputContext,
Combiner combiner,
TezCounter spilledRecordsCounter,
TezCounter reduceCombineInputCounter,
TezCounter mergedMapOutputsCounter,
ExceptionReporter exceptionReporter,
long initialMemoryAvailable,
CompressionCodec codec,
boolean ifileReadAheadEnabled,
int ifileReadAheadLength) {
this.inputContext = inputContext;
this.conf = conf;
this.localDirAllocator = localDirAllocator;
this.exceptionReporter = exceptionReporter;
this.initialMemoryAvailable = initialMemoryAvailable;
this.combiner = combiner;
this.reduceCombineInputCounter = reduceCombineInputCounter;
this.spilledRecordsCounter = spilledRecordsCounter;
this.mergedMapOutputsCounter = mergedMapOutputsCounter;
this.mapOutputFile = new TezTaskOutputFiles(conf, inputContext.getUniqueIdentifier());
this.localFS = localFS;
this.rfs = ((LocalFileSystem)localFS).getRaw();
this.numDiskToDiskMerges = inputContext.getCounters().findCounter(TaskCounter.NUM_DISK_TO_DISK_MERGES);
this.numMemToDiskMerges = inputContext.getCounters().findCounter(TaskCounter.NUM_MEM_TO_DISK_MERGES);
this.additionalBytesWritten = inputContext.getCounters().findCounter(TaskCounter.ADDITIONAL_SPILLS_BYTES_WRITTEN);
this.additionalBytesRead = inputContext.getCounters().findCounter(TaskCounter.ADDITIONAL_SPILLS_BYTES_READ);
this.codec = codec;
this.ifileReadAhead = ifileReadAheadEnabled;
if (this.ifileReadAhead) {
this.ifileReadAheadLength = ifileReadAheadLength;
} else {
this.ifileReadAheadLength = 0;
}
this.ifileBufferSize = conf.getInt("io.file.buffer.size",
TezJobConfig.TEZ_RUNTIME_IFILE_BUFFER_SIZE_DEFAULT);
// Figure out initial memory req start
final float maxInMemCopyUse =
conf.getFloat(
TezJobConfig.TEZ_RUNTIME_SHUFFLE_INPUT_BUFFER_PERCENT,
TezJobConfig.TEZ_RUNTIME_SHUFFLE_INPUT_BUFFER_PERCENT_DEFAULT);
if (maxInMemCopyUse > 1.0 || maxInMemCopyUse < 0.0) {
throw new IllegalArgumentException("Invalid value for " +
TezJobConfig.TEZ_RUNTIME_SHUFFLE_INPUT_BUFFER_PERCENT + ": " +
maxInMemCopyUse);
}
// Allow unit tests to fix Runtime memory
long memLimit = (long) (conf.getLong(Constants.TEZ_RUNTIME_TASK_MEMORY,
Math.min(inputContext.getTotalMemoryAvailableToTask(), Integer.MAX_VALUE)) * maxInMemCopyUse);
float maxRedPer = conf.getFloat(TezJobConfig.TEZ_RUNTIME_INPUT_BUFFER_PERCENT,
TezJobConfig.TEZ_RUNTIME_INPUT_BUFFER_PERCENT_DEFAULT);
if (maxRedPer > 1.0 || maxRedPer < 0.0) {
throw new TezUncheckedException(TezJobConfig.TEZ_RUNTIME_INPUT_BUFFER_PERCENT + maxRedPer);
}
// TODO maxRedBuffer should be a long.
int maxRedBuffer = (int) Math.min(inputContext.getTotalMemoryAvailableToTask() * maxRedPer,
Integer.MAX_VALUE);
// Figure out initial memory req end
if (this.initialMemoryAvailable < memLimit) {
this.memoryLimit = this.initialMemoryAvailable;
} else {
this.memoryLimit = memLimit;
}
if (this.initialMemoryAvailable < maxRedBuffer) {
this.postMergeMemLimit = (int) this.initialMemoryAvailable;
} else {
this.postMergeMemLimit = maxRedBuffer;
}
LOG.info("InitialRequest: ShuffleMem=" + memLimit + ", postMergeMem=" + maxRedBuffer
+ ", RuntimeTotalAvailable=" + this.initialMemoryAvailable + "Updated to: ShuffleMem="
+ this.memoryLimit + ", postMergeMem: " + this.postMergeMemLimit);
this.ioSortFactor =
conf.getInt(
TezJobConfig.TEZ_RUNTIME_IO_SORT_FACTOR,
TezJobConfig.TEZ_RUNTIME_IO_SORT_FACTOR_DEFAULT);
final float singleShuffleMemoryLimitPercent =
conf.getFloat(
TezJobConfig.TEZ_RUNTIME_SHUFFLE_MEMORY_LIMIT_PERCENT,
TezJobConfig.TEZ_RUNTIME_SHUFFLE_MEMORY_LIMIT_PERCENT_DEFAULT);
if (singleShuffleMemoryLimitPercent <= 0.0f
|| singleShuffleMemoryLimitPercent > 1.0f) {
throw new IllegalArgumentException("Invalid value for "
+ TezJobConfig.TEZ_RUNTIME_SHUFFLE_MEMORY_LIMIT_PERCENT + ": "
+ singleShuffleMemoryLimitPercent);
}
this.maxSingleShuffleLimit =
(long)(memoryLimit * singleShuffleMemoryLimitPercent);
this.memToMemMergeOutputsThreshold =
conf.getInt(
TezJobConfig.TEZ_RUNTIME_SHUFFLE_MEMTOMEM_SEGMENTS,
ioSortFactor);
this.mergeThreshold =
(long)(this.memoryLimit *
conf.getFloat(
TezJobConfig.TEZ_RUNTIME_SHUFFLE_MERGE_PERCENT,
TezJobConfig.TEZ_RUNTIME_SHUFFLE_MERGE_PERCENT_DEFAULT));
LOG.info("MergerManager: memoryLimit=" + memoryLimit + ", " +
"maxSingleShuffleLimit=" + maxSingleShuffleLimit + ", " +
"mergeThreshold=" + mergeThreshold + ", " +
"ioSortFactor=" + ioSortFactor + ", " +
"memToMemMergeOutputsThreshold=" + memToMemMergeOutputsThreshold);
if (this.maxSingleShuffleLimit >= this.mergeThreshold) {
throw new RuntimeException("Invlaid configuration: "
+ "maxSingleShuffleLimit should be less than mergeThreshold"
+ "maxSingleShuffleLimit: " + this.maxSingleShuffleLimit
+ "mergeThreshold: " + this.mergeThreshold);
}
boolean allowMemToMemMerge =
conf.getBoolean(
TezJobConfig.TEZ_RUNTIME_SHUFFLE_ENABLE_MEMTOMEM,
TezJobConfig.TEZ_RUNTIME_SHUFFLE_ENABLE_MEMTOMEM_DEFAULT);
if (allowMemToMemMerge) {
this.memToMemMerger =
new IntermediateMemoryToMemoryMerger(this,
memToMemMergeOutputsThreshold);
} else {
this.memToMemMerger = null;
}
this.inMemoryMerger = new InMemoryMerger(this);
this.onDiskMerger = new OnDiskMerger(this);
}