public MergeManager()

in tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/MergeManager.java [133:273]


  public MergeManager(Configuration conf, 
                      FileSystem localFS,
                      LocalDirAllocator localDirAllocator,  
                      TezInputContext inputContext,
                      Combiner combiner,
                      TezCounter spilledRecordsCounter,
                      TezCounter reduceCombineInputCounter,
                      TezCounter mergedMapOutputsCounter,
                      ExceptionReporter exceptionReporter,
                      long initialMemoryAvailable,
                      CompressionCodec codec,
                      boolean ifileReadAheadEnabled,
                      int ifileReadAheadLength) {
    this.inputContext = inputContext;
    this.conf = conf;
    this.localDirAllocator = localDirAllocator;
    this.exceptionReporter = exceptionReporter;
    this.initialMemoryAvailable = initialMemoryAvailable;
    
    this.combiner = combiner;

    this.reduceCombineInputCounter = reduceCombineInputCounter;
    this.spilledRecordsCounter = spilledRecordsCounter;
    this.mergedMapOutputsCounter = mergedMapOutputsCounter;
    this.mapOutputFile = new TezTaskOutputFiles(conf, inputContext.getUniqueIdentifier());
    
    this.localFS = localFS;
    this.rfs = ((LocalFileSystem)localFS).getRaw();
    
    this.numDiskToDiskMerges = inputContext.getCounters().findCounter(TaskCounter.NUM_DISK_TO_DISK_MERGES);
    this.numMemToDiskMerges = inputContext.getCounters().findCounter(TaskCounter.NUM_MEM_TO_DISK_MERGES);
    this.additionalBytesWritten = inputContext.getCounters().findCounter(TaskCounter.ADDITIONAL_SPILLS_BYTES_WRITTEN);
    this.additionalBytesRead = inputContext.getCounters().findCounter(TaskCounter.ADDITIONAL_SPILLS_BYTES_READ);

    this.codec = codec;
    this.ifileReadAhead = ifileReadAheadEnabled;
    if (this.ifileReadAhead) {
      this.ifileReadAheadLength = ifileReadAheadLength;
    } else {
      this.ifileReadAheadLength = 0;
    }
    this.ifileBufferSize = conf.getInt("io.file.buffer.size",
        TezJobConfig.TEZ_RUNTIME_IFILE_BUFFER_SIZE_DEFAULT);
    
    // Figure out initial memory req start
    final float maxInMemCopyUse =
      conf.getFloat(
          TezJobConfig.TEZ_RUNTIME_SHUFFLE_INPUT_BUFFER_PERCENT, 
          TezJobConfig.TEZ_RUNTIME_SHUFFLE_INPUT_BUFFER_PERCENT_DEFAULT);
    if (maxInMemCopyUse > 1.0 || maxInMemCopyUse < 0.0) {
      throw new IllegalArgumentException("Invalid value for " +
          TezJobConfig.TEZ_RUNTIME_SHUFFLE_INPUT_BUFFER_PERCENT + ": " +
          maxInMemCopyUse);
    }

    // Allow unit tests to fix Runtime memory
    long memLimit = (long) (conf.getLong(Constants.TEZ_RUNTIME_TASK_MEMORY,
        Math.min(inputContext.getTotalMemoryAvailableToTask(), Integer.MAX_VALUE)) * maxInMemCopyUse);

    float maxRedPer = conf.getFloat(TezJobConfig.TEZ_RUNTIME_INPUT_BUFFER_PERCENT,
        TezJobConfig.TEZ_RUNTIME_INPUT_BUFFER_PERCENT_DEFAULT);
    if (maxRedPer > 1.0 || maxRedPer < 0.0) {
      throw new TezUncheckedException(TezJobConfig.TEZ_RUNTIME_INPUT_BUFFER_PERCENT + maxRedPer);
    }
    // TODO maxRedBuffer should be a long.
    int maxRedBuffer = (int) Math.min(inputContext.getTotalMemoryAvailableToTask() * maxRedPer,
        Integer.MAX_VALUE);
    // Figure out initial memory req end
    
    if (this.initialMemoryAvailable < memLimit) {
      this.memoryLimit = this.initialMemoryAvailable;
    } else {
      this.memoryLimit = memLimit;
    }
    
    if (this.initialMemoryAvailable < maxRedBuffer) {
      this.postMergeMemLimit = (int) this.initialMemoryAvailable;
    } else {
      this.postMergeMemLimit = maxRedBuffer;
    }
    
    LOG.info("InitialRequest: ShuffleMem=" + memLimit + ", postMergeMem=" + maxRedBuffer
        + ", RuntimeTotalAvailable=" + this.initialMemoryAvailable + "Updated to: ShuffleMem="
        + this.memoryLimit + ", postMergeMem: " + this.postMergeMemLimit);

    this.ioSortFactor = 
        conf.getInt(
            TezJobConfig.TEZ_RUNTIME_IO_SORT_FACTOR, 
            TezJobConfig.TEZ_RUNTIME_IO_SORT_FACTOR_DEFAULT);
    
    final float singleShuffleMemoryLimitPercent =
        conf.getFloat(
            TezJobConfig.TEZ_RUNTIME_SHUFFLE_MEMORY_LIMIT_PERCENT,
            TezJobConfig.TEZ_RUNTIME_SHUFFLE_MEMORY_LIMIT_PERCENT_DEFAULT);
    if (singleShuffleMemoryLimitPercent <= 0.0f
        || singleShuffleMemoryLimitPercent > 1.0f) {
      throw new IllegalArgumentException("Invalid value for "
          + TezJobConfig.TEZ_RUNTIME_SHUFFLE_MEMORY_LIMIT_PERCENT + ": "
          + singleShuffleMemoryLimitPercent);
    }

    this.maxSingleShuffleLimit = 
      (long)(memoryLimit * singleShuffleMemoryLimitPercent);
    this.memToMemMergeOutputsThreshold = 
            conf.getInt(
                TezJobConfig.TEZ_RUNTIME_SHUFFLE_MEMTOMEM_SEGMENTS, 
                ioSortFactor);
    this.mergeThreshold = 
        (long)(this.memoryLimit * 
               conf.getFloat(
                   TezJobConfig.TEZ_RUNTIME_SHUFFLE_MERGE_PERCENT, 
                   TezJobConfig.TEZ_RUNTIME_SHUFFLE_MERGE_PERCENT_DEFAULT));
    LOG.info("MergerManager: memoryLimit=" + memoryLimit + ", " +
             "maxSingleShuffleLimit=" + maxSingleShuffleLimit + ", " +
             "mergeThreshold=" + mergeThreshold + ", " + 
             "ioSortFactor=" + ioSortFactor + ", " +
             "memToMemMergeOutputsThreshold=" + memToMemMergeOutputsThreshold);
    
    if (this.maxSingleShuffleLimit >= this.mergeThreshold) {
      throw new RuntimeException("Invlaid configuration: "
          + "maxSingleShuffleLimit should be less than mergeThreshold"
          + "maxSingleShuffleLimit: " + this.maxSingleShuffleLimit
          + "mergeThreshold: " + this.mergeThreshold);
    }
    
    boolean allowMemToMemMerge = 
        conf.getBoolean(
            TezJobConfig.TEZ_RUNTIME_SHUFFLE_ENABLE_MEMTOMEM, 
            TezJobConfig.TEZ_RUNTIME_SHUFFLE_ENABLE_MEMTOMEM_DEFAULT);
      if (allowMemToMemMerge) {
        this.memToMemMerger = 
          new IntermediateMemoryToMemoryMerger(this,
                                               memToMemMergeOutputsThreshold);
      } else {
        this.memToMemMerger = null;
      }
      
      this.inMemoryMerger = new InMemoryMerger(this);
      
      this.onDiskMerger = new OnDiskMerger(this);
  }