public PipelinedSorter()

in tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java [135:247]


  public PipelinedSorter(OutputContext outputContext, Configuration conf, int numOutputs,
      long initialMemoryAvailable) throws IOException {
    super(outputContext, conf, numOutputs, initialMemoryAvailable);

    lazyAllocateMem = this.conf.getBoolean(TezRuntimeConfiguration
        .TEZ_RUNTIME_PIPELINED_SORTER_LAZY_ALLOCATE_MEMORY, TezRuntimeConfiguration
        .TEZ_RUNTIME_PIPELINED_SORTER_LAZY_ALLOCATE_MEMORY_DEFAULT);

    if (lazyAllocateMem) {
      /**
       * When lazy-allocation is enabled, framework takes care of auto
       * allocating memory on need basis. Desirable block size is set to 256MB
       */
      //256 MB - 64 bytes. See comment for the 32MB allocation.
      MIN_BLOCK_SIZE = ((256 << 20) - 64);
    } else {
      int minBlockSize = conf.getInt(TezRuntimeConfiguration
              .TEZ_RUNTIME_PIPELINED_SORTER_MIN_BLOCK_SIZE_IN_MB,
          TezRuntimeConfiguration
              .TEZ_RUNTIME_PIPELINED_SORTER_MIN_BLOCK_SIZE_IN_MB_DEFAULT);
      Preconditions.checkArgument(
          (minBlockSize > 0 && minBlockSize < 2047),
          TezRuntimeConfiguration
              .TEZ_RUNTIME_PIPELINED_SORTER_MIN_BLOCK_SIZE_IN_MB
              + "=" + minBlockSize + " should be a positive value between 0 and 2047");
      MIN_BLOCK_SIZE = minBlockSize << 20;
    }

    StringBuilder initialSetupLogLine = new StringBuilder("Setting up PipelinedSorter for ")
        .append(outputContext.getInputOutputVertexNames()).append(": ");
    partitionBits = bitcount(partitions)+1;

    boolean confPipelinedShuffle = this.conf.getBoolean(TezRuntimeConfiguration
        .TEZ_RUNTIME_PIPELINED_SHUFFLE_ENABLED, TezRuntimeConfiguration
        .TEZ_RUNTIME_PIPELINED_SHUFFLE_ENABLED_DEFAULT);

    pipelinedShuffle = !isFinalMergeEnabled() && confPipelinedShuffle;
    auxiliaryService = conf.get(TezConfiguration.TEZ_AM_SHUFFLE_AUXILIARY_SERVICE_ID,
        TezConfiguration.TEZ_AM_SHUFFLE_AUXILIARY_SERVICE_ID_DEFAULT);
    //sanity checks
    final long sortmb = this.availableMemoryMb;

    // buffers and accounting
    long maxMemLimit = sortmb << 20;

    initialSetupLogLine.append(", UsingHashComparator=");
    // k/v serialization
    if(comparator instanceof ProxyComparator) {
      hasher = (ProxyComparator)comparator;
      initialSetupLogLine.append(true);
    } else {
      hasher = null;
      initialSetupLogLine.append(false);
    }

    LOG.info(initialSetupLogLine.toString());

    long totalCapacityWithoutMeta = 0;
    long availableMem = maxMemLimit;
    int numBlocks = 0;
    while(availableMem > 0) {
      long size = Math.min(availableMem, computeBlockSize(availableMem, maxMemLimit));
      int sizeWithoutMeta = (int) ((size) - (size % METASIZE));
      totalCapacityWithoutMeta += sizeWithoutMeta;
      availableMem -= size;
      numBlocks++;
    }
    currentAllocatableMemory = maxMemLimit;
    maxNumberOfBlocks = numBlocks;
    capacity = totalCapacityWithoutMeta;

    buffers = Lists.newArrayListWithCapacity(maxNumberOfBlocks);
    bufferUsage = Lists.newArrayListWithCapacity(maxNumberOfBlocks);
    allocateSpace(); //Allocate the first block
    if (!lazyAllocateMem) {
      LOG.info("Pre allocating rest of memory buffers upfront");
      while(allocateSpace() != null);
    }

    initialSetupLogLine.append("#blocks=").append(maxNumberOfBlocks);
    initialSetupLogLine.append(", maxMemUsage=").append(maxMemLimit);
    initialSetupLogLine.append(", lazyAllocateMem=").append(
        lazyAllocateMem);
    initialSetupLogLine.append(", minBlockSize=").append(MIN_BLOCK_SIZE);
    initialSetupLogLine.append(", initial BLOCK_SIZE=").append(buffers.get(0).capacity());
    initialSetupLogLine.append(", finalMergeEnabled=").append(isFinalMergeEnabled());
    initialSetupLogLine.append(", pipelinedShuffle=").append(pipelinedShuffle);
    initialSetupLogLine.append(", sendEmptyPartitions=").append(sendEmptyPartitionDetails);
    initialSetupLogLine.append(", ").append(TezRuntimeConfiguration.TEZ_RUNTIME_IO_SORT_MB).append(
        "=").append(
        sortmb);

    Preconditions.checkState(buffers.size() > 0, "Atleast one buffer needs to be present");
    LOG.info(initialSetupLogLine.toString());

    span = new SortSpan(buffers.get(bufferIndex), 1024 * 1024, 16, this.comparator);
    merger = new SpanMerger(); // SpanIterators are comparable
    final int sortThreads = 
            this.conf.getInt(
                TezRuntimeConfiguration.TEZ_RUNTIME_PIPELINED_SORTER_SORT_THREADS,
                TezRuntimeConfiguration.TEZ_RUNTIME_PIPELINED_SORTER_SORT_THREADS_DEFAULT);
    sortmaster = Executors.newFixedThreadPool(sortThreads,
        new ThreadFactoryBuilder().setDaemon(true)
            .setNameFormat("Sorter {" + TezUtilsInternal.cleanVertexName(outputContext.getTaskVertexName()) + " -> "
                + TezUtilsInternal.cleanVertexName(outputContext.getDestinationVertexName()) + "} #%d")
            .build());

    valSerializer.open(span.out);
    keySerializer.open(span.out);
    minSpillsForCombine = this.conf.getInt(TezRuntimeConfiguration.TEZ_RUNTIME_COMBINE_MIN_SPILLS, 3);
    deflater = TezCommonUtils.newBestCompressionDeflater();
    finalEvents = Lists.newLinkedList();
  }