in client-tez/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/RssMergeManager.java [98:198]
public RssMergeManager(
Configuration conf,
FileSystem localFS,
InputContext inputContext,
Combiner combiner,
TezCounter spilledRecordsCounter,
TezCounter reduceCombineInputCounter,
TezCounter mergedMapOutputsCounter,
ExceptionReporter exceptionReporter,
long initialMemoryAvailable,
CompressionCodec codec,
boolean ifileReadAheadEnabled,
int ifileReadAheadLength,
Configuration remoteConf,
int replication,
int retries,
String appAttemptId) {
super(
conf,
localFS,
null,
inputContext,
combiner,
spilledRecordsCounter,
reduceCombineInputCounter,
mergedMapOutputsCounter,
exceptionReporter,
initialMemoryAvailable,
codec,
ifileReadAheadEnabled,
ifileReadAheadLength);
this.conf = conf;
this.inputContext = inputContext;
this.exceptionReporter = exceptionReporter;
this.codec = codec;
this.combiner = combiner;
this.initialMemoryAvailable = initialMemoryAvailable;
this.ifileReadAhead = ifileReadAheadEnabled;
if (this.ifileReadAhead) {
this.ifileReadAheadLength = ifileReadAheadLength;
} else {
this.ifileReadAheadLength = 0;
}
this.ifileBufferSize =
conf.getInt(
"io.file.buffer.size", TezRuntimeConfiguration.TEZ_RUNTIME_IFILE_BUFFER_SIZE_DEFAULT);
this.appAttemptId = appAttemptId;
this.cleanup =
conf.getBoolean(
TezRuntimeConfiguration.TEZ_RUNTIME_CLEANUP_FILES_ON_INTERRUPT,
TezRuntimeConfiguration.TEZ_RUNTIME_CLEANUP_FILES_ON_INTERRUPT_DEFAULT);
// Set memory, here ignore some check which have done in MergeManager
final float maxInMemCopyUse =
conf.getFloat(
TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_FETCH_BUFFER_PERCENT,
TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_FETCH_BUFFER_PERCENT_DEFAULT);
long memLimit =
conf.getLong(
Constants.TEZ_RUNTIME_TASK_MEMORY,
(long) (inputContext.getTotalMemoryAvailableToTask() * maxInMemCopyUse));
if (this.initialMemoryAvailable < memLimit) {
this.memoryLimit = this.initialMemoryAvailable;
} else {
this.memoryLimit = memLimit;
}
this.mergeThreshold =
(long)
(this.memoryLimit
* conf.getFloat(
TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_MERGE_PERCENT,
TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_MERGE_PERCENT_DEFAULT));
final float singleShuffleMemoryLimitPercent =
conf.getFloat(
TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_MEMORY_LIMIT_PERCENT,
TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_MEMORY_LIMIT_PERCENT_DEFAULT);
this.maxSingleShuffleLimit =
(long) Math.min((memoryLimit * singleShuffleMemoryLimitPercent), Integer.MAX_VALUE);
// counter
this.spilledRecordsCounter = spilledRecordsCounter;
this.mergedMapOutputsCounter = mergedMapOutputsCounter;
this.additionalBytesRead =
inputContext.getCounters().findCounter(TaskCounter.ADDITIONAL_SPILLS_BYTES_READ);
// remote fs
Configuration remoteConfCopied = new Configuration(remoteConf);
this.spillBasePath = conf.get(RSS_REMOTE_SPILL_STORAGE_PATH);
try {
remoteConfCopied.setInt("dfs.replication", replication);
remoteConfCopied.setInt("dfs.client.block.write.retries", retries);
this.remoteFS =
HadoopFilesystemProvider.getFilesystem(new Path(spillBasePath), remoteConfCopied);
} catch (Exception e) {
throw new RssException("Cannot init remoteFS on path:" + spillBasePath);
}
if (StringUtils.isBlank(this.spillBasePath)) {
throw new RssException("You must set remote spill path!");
}
this.inMemoryMerger = createRssInMemoryMerger();
}