public RssRemoteMergeManagerImpl()

in client-mr/core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/RssRemoteMergeManagerImpl.java [92:181]


  public RssRemoteMergeManagerImpl(
      String appId,
      TaskAttemptID reduceId,
      JobConf jobConf,
      String basePath,
      int replication,
      int retries,
      FileSystem localFS,
      LocalDirAllocator localDirAllocator,
      Reporter reporter,
      CompressionCodec codec,
      Class<? extends Reducer> combinerClass,
      Task.CombineOutputCollector<K, V> combineCollector,
      Counters.Counter spilledRecordsCounter,
      Counters.Counter reduceCombineInputCounter,
      Counters.Counter mergedMapOutputsCounter,
      ExceptionReporter exceptionReporter,
      Progress mergePhase,
      MapOutputFile mapOutputFile,
      JobConf remoteConf) {
    super(
        reduceId,
        jobConf,
        localFS,
        localDirAllocator,
        reporter,
        codec,
        combinerClass,
        combineCollector,
        spilledRecordsCounter,
        reduceCombineInputCounter,
        mergedMapOutputsCounter,
        exceptionReporter,
        mergePhase,
        mapOutputFile);

    this.appId = appId;
    this.reduceId = reduceId;
    this.jobConf = jobConf;
    this.exceptionReporter = exceptionReporter;
    this.mergePhase = mergePhase;

    this.reporter = reporter;
    this.codec = codec;
    this.combinerClass = combinerClass;
    this.combineCollector = combineCollector;
    this.reduceCombineInputCounter = reduceCombineInputCounter;
    this.spilledRecordsCounter = spilledRecordsCounter;
    this.mergedMapOutputsCounter = mergedMapOutputsCounter;

    try {
      remoteConf.setInt("dfs.replication", replication);
      remoteConf.setInt("dfs.client.block.write.retries", retries); // origin=3
      this.remoteFS = HadoopFilesystemProvider.getFilesystem(new Path(basePath), remoteConf);
    } catch (Exception e) {
      throw new RssException("Cannot init remoteFS on path:" + basePath);
    }

    this.basePath = basePath;

    final float maxInMemCopyUse =
        jobConf.getFloat(
            MRJobConfig.SHUFFLE_INPUT_BUFFER_PERCENT,
            MRJobConfig.DEFAULT_SHUFFLE_INPUT_BUFFER_PERCENT);
    if (maxInMemCopyUse > 1.0 || maxInMemCopyUse < 0.0) {
      throw new IllegalArgumentException(
          "Invalid value for " + MRJobConfig.SHUFFLE_INPUT_BUFFER_PERCENT + ": " + maxInMemCopyUse);
    }

    // Allow unit tests to fix Runtime memory
    this.memoryLimit =
        (long)
            (jobConf.getLong(
                    MRJobConfig.REDUCE_MEMORY_TOTAL_BYTES, Runtime.getRuntime().maxMemory())
                * maxInMemCopyUse);

    this.usedMemory = 0L;
    this.commitMemory = 0L;

    this.mergeThreshold =
        (long)
            (this.memoryLimit
                * jobConf.getFloat(
                    MRJobConfig.SHUFFLE_MERGE_PERCENT, MRJobConfig.DEFAULT_SHUFFLE_MERGE_PERCENT));
    LOG.info(
        "MergerManager: memoryLimit=" + memoryLimit + ", " + "mergeThreshold=" + mergeThreshold);

    this.inMemoryMerger = createRssInMemoryMerger();
    this.inMemoryMerger.start();
  }