in tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/MergeManager.java [166:317]
public MergeManager(Configuration conf,
FileSystem localFS,
LocalDirAllocator localDirAllocator,
InputContext inputContext,
Combiner combiner,
TezCounter spilledRecordsCounter,
TezCounter reduceCombineInputCounter,
TezCounter mergedMapOutputsCounter,
ExceptionReporter exceptionReporter,
long initialMemoryAvailable,
CompressionCodec codec,
boolean ifileReadAheadEnabled,
int ifileReadAheadLength) {
this.inputContext = inputContext;
this.conf = conf;
this.localDirAllocator = localDirAllocator;
this.exceptionReporter = exceptionReporter;
this.initialMemoryAvailable = initialMemoryAvailable;
this.combiner = combiner;
this.reduceCombineInputCounter = reduceCombineInputCounter;
this.spilledRecordsCounter = spilledRecordsCounter;
this.mergedMapOutputsCounter = mergedMapOutputsCounter;
this.mapOutputFile = new TezTaskOutputFiles(conf,
inputContext.getUniqueIdentifier(),
inputContext.getDagIdentifier());
this.localFS = localFS;
this.rfs = ((LocalFileSystem)localFS).getRaw();
this.numDiskToDiskMerges = inputContext.getCounters().findCounter(TaskCounter.NUM_DISK_TO_DISK_MERGES);
this.numMemToDiskMerges = inputContext.getCounters().findCounter(TaskCounter.NUM_MEM_TO_DISK_MERGES);
this.additionalBytesWritten = inputContext.getCounters().findCounter(TaskCounter.ADDITIONAL_SPILLS_BYTES_WRITTEN);
this.additionalBytesRead = inputContext.getCounters().findCounter(TaskCounter.ADDITIONAL_SPILLS_BYTES_READ);
this.cleanup = conf.getBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_CLEANUP_FILES_ON_INTERRUPT,
TezRuntimeConfiguration.TEZ_RUNTIME_CLEANUP_FILES_ON_INTERRUPT_DEFAULT);
this.codec = codec;
this.ifileReadAhead = ifileReadAheadEnabled;
if (this.ifileReadAhead) {
this.ifileReadAheadLength = ifileReadAheadLength;
} else {
this.ifileReadAheadLength = 0;
}
this.ifileBufferSize = conf.getInt("io.file.buffer.size",
TezRuntimeConfiguration.TEZ_RUNTIME_IFILE_BUFFER_SIZE_DEFAULT);
// Figure out initial memory req start
final float maxInMemCopyUse =
conf.getFloat(
TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_FETCH_BUFFER_PERCENT,
TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_FETCH_BUFFER_PERCENT_DEFAULT);
if (maxInMemCopyUse > 1.0 || maxInMemCopyUse < 0.0) {
throw new IllegalArgumentException("Invalid value for " +
TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_FETCH_BUFFER_PERCENT + ": " +
maxInMemCopyUse);
}
// Allow unit tests to fix Runtime memory
long memLimit = conf.getLong(Constants.TEZ_RUNTIME_TASK_MEMORY, (long)(inputContext
.getTotalMemoryAvailableToTask() * maxInMemCopyUse));
float maxRedPer = conf.getFloat(TezRuntimeConfiguration.TEZ_RUNTIME_INPUT_POST_MERGE_BUFFER_PERCENT,
TezRuntimeConfiguration.TEZ_RUNTIME_INPUT_BUFFER_PERCENT_DEFAULT);
if (maxRedPer > 1.0 || maxRedPer < 0.0) {
throw new TezUncheckedException(TezRuntimeConfiguration.TEZ_RUNTIME_INPUT_POST_MERGE_BUFFER_PERCENT + maxRedPer);
}
long maxRedBuffer = (long) (inputContext.getTotalMemoryAvailableToTask() * maxRedPer);
// Figure out initial memory req end
if (this.initialMemoryAvailable < memLimit) {
this.memoryLimit = this.initialMemoryAvailable;
} else {
this.memoryLimit = memLimit;
}
if (this.initialMemoryAvailable < maxRedBuffer) {
this.postMergeMemLimit = this.initialMemoryAvailable;
} else {
this.postMergeMemLimit = maxRedBuffer;
}
if (LOG.isDebugEnabled()) {
LOG.debug(
inputContext.getInputOutputVertexNames() + ": " + "InitialRequest: ShuffleMem=" + memLimit +
", postMergeMem=" + maxRedBuffer
+ ", RuntimeTotalAvailable=" + this.initialMemoryAvailable +
". Updated to: ShuffleMem="
+ this.memoryLimit + ", postMergeMem: " + this.postMergeMemLimit);
}
this.ioSortFactor =
conf.getInt(
TezRuntimeConfiguration.TEZ_RUNTIME_IO_SORT_FACTOR,
TezRuntimeConfiguration.TEZ_RUNTIME_IO_SORT_FACTOR_DEFAULT);
final float singleShuffleMemoryLimitPercent =
conf.getFloat(
TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_MEMORY_LIMIT_PERCENT,
TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_MEMORY_LIMIT_PERCENT_DEFAULT);
if (singleShuffleMemoryLimitPercent <= 0.0f
|| singleShuffleMemoryLimitPercent > 1.0f) {
throw new IllegalArgumentException("Invalid value for "
+ TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_MEMORY_LIMIT_PERCENT + ": "
+ singleShuffleMemoryLimitPercent);
}
//TODO: Cap it to MAX_VALUE until MapOutput starts supporting > 2 GB
this.maxSingleShuffleLimit =
(long) Math.min((memoryLimit * singleShuffleMemoryLimitPercent), Integer.MAX_VALUE);
this.memToMemMergeOutputsThreshold =
conf.getInt(
TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_MEMTOMEM_SEGMENTS,
ioSortFactor);
this.mergeThreshold =
(long)(this.memoryLimit *
conf.getFloat(
TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_MERGE_PERCENT,
TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_MERGE_PERCENT_DEFAULT));
LOG.info(inputContext.getInputOutputVertexNames() + ": MergerManager: memoryLimit=" + memoryLimit + ", " +
"maxSingleShuffleLimit=" + maxSingleShuffleLimit + ", " +
"mergeThreshold=" + mergeThreshold + ", " +
"ioSortFactor=" + ioSortFactor + ", " +
"postMergeMem=" + postMergeMemLimit + ", " +
"memToMemMergeOutputsThreshold=" + memToMemMergeOutputsThreshold);
if (this.maxSingleShuffleLimit >= this.mergeThreshold) {
throw new RuntimeException("Invalid configuration: "
+ "maxSingleShuffleLimit should be less than mergeThreshold"
+ "maxSingleShuffleLimit: " + this.maxSingleShuffleLimit
+ ", mergeThreshold: " + this.mergeThreshold);
}
boolean allowMemToMemMerge =
conf.getBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_ENABLE_MEMTOMEM,
TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_ENABLE_MEMTOMEM_DEFAULT);
if (allowMemToMemMerge) {
this.memToMemMerger =
new IntermediateMemoryToMemoryMerger(this, memToMemMergeOutputsThreshold);
} else {
this.memToMemMerger = null;
}
this.inMemoryMerger = new InMemoryMerger(this);
this.onDiskMerger = new OnDiskMerger(this);
this.serializationContext = new SerializationContext(conf);
}