in client-mr/core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/RssRemoteMergeManagerImpl.java [92:181]
public RssRemoteMergeManagerImpl(
String appId,
TaskAttemptID reduceId,
JobConf jobConf,
String basePath,
int replication,
int retries,
FileSystem localFS,
LocalDirAllocator localDirAllocator,
Reporter reporter,
CompressionCodec codec,
Class<? extends Reducer> combinerClass,
Task.CombineOutputCollector<K, V> combineCollector,
Counters.Counter spilledRecordsCounter,
Counters.Counter reduceCombineInputCounter,
Counters.Counter mergedMapOutputsCounter,
ExceptionReporter exceptionReporter,
Progress mergePhase,
MapOutputFile mapOutputFile,
JobConf remoteConf) {
super(
reduceId,
jobConf,
localFS,
localDirAllocator,
reporter,
codec,
combinerClass,
combineCollector,
spilledRecordsCounter,
reduceCombineInputCounter,
mergedMapOutputsCounter,
exceptionReporter,
mergePhase,
mapOutputFile);
this.appId = appId;
this.reduceId = reduceId;
this.jobConf = jobConf;
this.exceptionReporter = exceptionReporter;
this.mergePhase = mergePhase;
this.reporter = reporter;
this.codec = codec;
this.combinerClass = combinerClass;
this.combineCollector = combineCollector;
this.reduceCombineInputCounter = reduceCombineInputCounter;
this.spilledRecordsCounter = spilledRecordsCounter;
this.mergedMapOutputsCounter = mergedMapOutputsCounter;
try {
remoteConf.setInt("dfs.replication", replication);
remoteConf.setInt("dfs.client.block.write.retries", retries); // origin=3
this.remoteFS = HadoopFilesystemProvider.getFilesystem(new Path(basePath), remoteConf);
} catch (Exception e) {
throw new RssException("Cannot init remoteFS on path:" + basePath);
}
this.basePath = basePath;
final float maxInMemCopyUse =
jobConf.getFloat(
MRJobConfig.SHUFFLE_INPUT_BUFFER_PERCENT,
MRJobConfig.DEFAULT_SHUFFLE_INPUT_BUFFER_PERCENT);
if (maxInMemCopyUse > 1.0 || maxInMemCopyUse < 0.0) {
throw new IllegalArgumentException(
"Invalid value for " + MRJobConfig.SHUFFLE_INPUT_BUFFER_PERCENT + ": " + maxInMemCopyUse);
}
// Allow unit tests to fix Runtime memory
this.memoryLimit =
(long)
(jobConf.getLong(
MRJobConfig.REDUCE_MEMORY_TOTAL_BYTES, Runtime.getRuntime().maxMemory())
* maxInMemCopyUse);
this.usedMemory = 0L;
this.commitMemory = 0L;
this.mergeThreshold =
(long)
(this.memoryLimit
* jobConf.getFloat(
MRJobConfig.SHUFFLE_MERGE_PERCENT, MRJobConfig.DEFAULT_SHUFFLE_MERGE_PERCENT));
LOG.info(
"MergerManager: memoryLimit=" + memoryLimit + ", " + "mergeThreshold=" + mergeThreshold);
this.inMemoryMerger = createRssInMemoryMerger();
this.inMemoryMerger.start();
}