public MergeManager()

in tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/MergeManager.java [166:317]


  public MergeManager(Configuration conf,
                      FileSystem localFS,
                      LocalDirAllocator localDirAllocator,
                      InputContext inputContext,
                      Combiner combiner,
                      TezCounter spilledRecordsCounter,
                      TezCounter reduceCombineInputCounter,
                      TezCounter mergedMapOutputsCounter,
                      ExceptionReporter exceptionReporter,
                      long initialMemoryAvailable,
                      CompressionCodec codec,
                      boolean ifileReadAheadEnabled,
                      int ifileReadAheadLength) {
    this.inputContext = inputContext;
    this.conf = conf;
    this.localDirAllocator = localDirAllocator;
    this.exceptionReporter = exceptionReporter;
    this.initialMemoryAvailable = initialMemoryAvailable;

    this.combiner = combiner;

    this.reduceCombineInputCounter = reduceCombineInputCounter;
    this.spilledRecordsCounter = spilledRecordsCounter;
    this.mergedMapOutputsCounter = mergedMapOutputsCounter;
    this.mapOutputFile = new TezTaskOutputFiles(conf,
        inputContext.getUniqueIdentifier(),
        inputContext.getDagIdentifier());

    this.localFS = localFS;
    this.rfs = ((LocalFileSystem)localFS).getRaw();

    this.numDiskToDiskMerges = inputContext.getCounters().findCounter(TaskCounter.NUM_DISK_TO_DISK_MERGES);
    this.numMemToDiskMerges = inputContext.getCounters().findCounter(TaskCounter.NUM_MEM_TO_DISK_MERGES);
    this.additionalBytesWritten = inputContext.getCounters().findCounter(TaskCounter.ADDITIONAL_SPILLS_BYTES_WRITTEN);
    this.additionalBytesRead = inputContext.getCounters().findCounter(TaskCounter.ADDITIONAL_SPILLS_BYTES_READ);

    this.cleanup = conf.getBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_CLEANUP_FILES_ON_INTERRUPT,
        TezRuntimeConfiguration.TEZ_RUNTIME_CLEANUP_FILES_ON_INTERRUPT_DEFAULT);

    this.codec = codec;
    this.ifileReadAhead = ifileReadAheadEnabled;
    if (this.ifileReadAhead) {
      this.ifileReadAheadLength = ifileReadAheadLength;
    } else {
      this.ifileReadAheadLength = 0;
    }
    this.ifileBufferSize = conf.getInt("io.file.buffer.size",
        TezRuntimeConfiguration.TEZ_RUNTIME_IFILE_BUFFER_SIZE_DEFAULT);

    // Figure out initial memory req start
    final float maxInMemCopyUse =
      conf.getFloat(
          TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_FETCH_BUFFER_PERCENT,
          TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_FETCH_BUFFER_PERCENT_DEFAULT);
    if (maxInMemCopyUse > 1.0 || maxInMemCopyUse < 0.0) {
      throw new IllegalArgumentException("Invalid value for " +
          TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_FETCH_BUFFER_PERCENT + ": " +
          maxInMemCopyUse);
    }

    // Allow unit tests to fix Runtime memory
    long memLimit = conf.getLong(Constants.TEZ_RUNTIME_TASK_MEMORY, (long)(inputContext
        .getTotalMemoryAvailableToTask() * maxInMemCopyUse));

    float maxRedPer = conf.getFloat(TezRuntimeConfiguration.TEZ_RUNTIME_INPUT_POST_MERGE_BUFFER_PERCENT,
        TezRuntimeConfiguration.TEZ_RUNTIME_INPUT_BUFFER_PERCENT_DEFAULT);
    if (maxRedPer > 1.0 || maxRedPer < 0.0) {
      throw new TezUncheckedException(TezRuntimeConfiguration.TEZ_RUNTIME_INPUT_POST_MERGE_BUFFER_PERCENT + maxRedPer);
    }

    long maxRedBuffer = (long) (inputContext.getTotalMemoryAvailableToTask() * maxRedPer);
    // Figure out initial memory req end

    if (this.initialMemoryAvailable < memLimit) {
      this.memoryLimit = this.initialMemoryAvailable;
    } else {
      this.memoryLimit = memLimit;
    }

    if (this.initialMemoryAvailable < maxRedBuffer) {
      this.postMergeMemLimit = this.initialMemoryAvailable;
    } else {
      this.postMergeMemLimit = maxRedBuffer;
    }

    if (LOG.isDebugEnabled()) {
      LOG.debug(
          inputContext.getInputOutputVertexNames() + ": " + "InitialRequest: ShuffleMem=" + memLimit +
              ", postMergeMem=" + maxRedBuffer
              + ", RuntimeTotalAvailable=" + this.initialMemoryAvailable +
              ". Updated to: ShuffleMem="
              + this.memoryLimit + ", postMergeMem: " + this.postMergeMemLimit);
    }

    this.ioSortFactor =
        conf.getInt(
            TezRuntimeConfiguration.TEZ_RUNTIME_IO_SORT_FACTOR,
            TezRuntimeConfiguration.TEZ_RUNTIME_IO_SORT_FACTOR_DEFAULT);

    final float singleShuffleMemoryLimitPercent =
        conf.getFloat(
            TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_MEMORY_LIMIT_PERCENT,
            TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_MEMORY_LIMIT_PERCENT_DEFAULT);
    if (singleShuffleMemoryLimitPercent <= 0.0f
        || singleShuffleMemoryLimitPercent > 1.0f) {
      throw new IllegalArgumentException("Invalid value for "
          + TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_MEMORY_LIMIT_PERCENT + ": "
          + singleShuffleMemoryLimitPercent);
    }

    //TODO: Cap it to MAX_VALUE until MapOutput starts supporting > 2 GB
    this.maxSingleShuffleLimit =
        (long) Math.min((memoryLimit * singleShuffleMemoryLimitPercent), Integer.MAX_VALUE);
    this.memToMemMergeOutputsThreshold =
            conf.getInt(
                TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_MEMTOMEM_SEGMENTS,
                ioSortFactor);
    this.mergeThreshold =
        (long)(this.memoryLimit *
               conf.getFloat(
                   TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_MERGE_PERCENT,
                   TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_MERGE_PERCENT_DEFAULT));
    LOG.info(inputContext.getInputOutputVertexNames() + ": MergerManager: memoryLimit=" + memoryLimit + ", " +
             "maxSingleShuffleLimit=" + maxSingleShuffleLimit + ", " +
             "mergeThreshold=" + mergeThreshold + ", " +
             "ioSortFactor=" + ioSortFactor + ", " +
             "postMergeMem=" + postMergeMemLimit + ", " +
             "memToMemMergeOutputsThreshold=" + memToMemMergeOutputsThreshold);

    if (this.maxSingleShuffleLimit >= this.mergeThreshold) {
      throw new RuntimeException("Invalid configuration: "
          + "maxSingleShuffleLimit should be less than mergeThreshold"
          + "maxSingleShuffleLimit: " + this.maxSingleShuffleLimit
          + ", mergeThreshold: " + this.mergeThreshold);
    }

    boolean allowMemToMemMerge =
        conf.getBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_ENABLE_MEMTOMEM,
            TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_ENABLE_MEMTOMEM_DEFAULT);
    if (allowMemToMemMerge) {
      this.memToMemMerger =
          new IntermediateMemoryToMemoryMerger(this, memToMemMergeOutputsThreshold);
    } else {
      this.memToMemMerger = null;
    }

    this.inMemoryMerger = new InMemoryMerger(this);

    this.onDiskMerger = new OnDiskMerger(this);

    this.serializationContext = new SerializationContext(conf);
  }