public UnorderedPartitionedKVWriter()

in tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/writers/UnorderedPartitionedKVWriter.java [203:333]


  public UnorderedPartitionedKVWriter(OutputContext outputContext, Configuration conf,
      int numOutputs, long availableMemoryBytes) throws IOException {
    super(outputContext, conf, numOutputs);

    Preconditions.checkArgument(availableMemoryBytes >= 0, "availableMemory should be >= 0 bytes");

    this.sourceDestNameTrimmed = TezUtilsInternal.cleanVertexName(outputContext.getTaskVertexName()) + " -> "
        + TezUtilsInternal.cleanVertexName(outputContext.getDestinationVertexName());
    //Not checking for TEZ_RUNTIME_ENABLE_FINAL_MERGE_IN_OUTPUT as it might not add much value in
    // this case.  Add it later if needed.
    boolean pipelinedShuffleConf = this.conf.getBoolean(TezRuntimeConfiguration
        .TEZ_RUNTIME_PIPELINED_SHUFFLE_ENABLED, TezRuntimeConfiguration
        .TEZ_RUNTIME_PIPELINED_SHUFFLE_ENABLED_DEFAULT);
    this.isFinalMergeEnabled = conf.getBoolean(
        TezRuntimeConfiguration.TEZ_RUNTIME_ENABLE_FINAL_MERGE_IN_OUTPUT,
        TezRuntimeConfiguration.TEZ_RUNTIME_ENABLE_FINAL_MERGE_IN_OUTPUT_DEFAULT);
    this.pipelinedShuffle = pipelinedShuffleConf && !isFinalMergeEnabled;
    this.finalEvents = Lists.newLinkedList();

    this.dataViaEventsEnabled = conf.getBoolean(
            TezRuntimeConfiguration.TEZ_RUNTIME_TRANSFER_DATA_VIA_EVENTS_ENABLED,
            TezRuntimeConfiguration.TEZ_RUNTIME_TRANSFER_DATA_VIA_EVENTS_ENABLED_DEFAULT);

    // No max cap on size (intentional)
    this.dataViaEventsMaxSize = conf.getInt(
            TezRuntimeConfiguration.TEZ_RUNTIME_TRANSFER_DATA_VIA_EVENTS_MAX_SIZE,
            TezRuntimeConfiguration.TEZ_RUNTIME_TRANSFER_DATA_VIA_EVENTS_MAX_SIZE_DEFAULT);

    boolean useCachedStreamConfig = conf.getBoolean(
        TezRuntimeConfiguration.TEZ_RUNTIME_TRANSFER_DATA_VIA_EVENTS_SUPPORT_IN_MEM_FILE,
        TezRuntimeConfiguration.TEZ_RUNTIME_TRANSFER_DATA_VIA_EVENTS_SUPPORT_IN_MEM_FILE_DEFAULT);

    this.useCachedStream = useCachedStreamConfig && (this.dataViaEventsEnabled && (numPartitions == 1)
        && !pipelinedShuffle);

    if (availableMemoryBytes == 0) {
      Preconditions.checkArgument(((numPartitions == 1) && !pipelinedShuffle), "availableMemory "
          + "can be set to 0 only when numPartitions=1 and " + TezRuntimeConfiguration
          .TEZ_RUNTIME_PIPELINED_SHUFFLE_ENABLED + " is disabled. current numPartitions=" +
          numPartitions + ", " + TezRuntimeConfiguration.TEZ_RUNTIME_PIPELINED_SHUFFLE_ENABLED + "="
          + pipelinedShuffle);
    }

    // Ideally, should be significantly larger.
    availableMemory = availableMemoryBytes;

    // Allow unit tests to control the buffer sizes.
    int maxSingleBufferSizeBytes = conf.getInt(
        TezRuntimeConfiguration.TEZ_RUNTIME_UNORDERED_OUTPUT_MAX_PER_BUFFER_SIZE_BYTES,
        Integer.MAX_VALUE);
    computeNumBuffersAndSize(maxSingleBufferSizeBytes);

    availableBuffers = new LinkedBlockingQueue<WrappedBuffer>();
    buffers = new WrappedBuffer[numBuffers];
    // Set up only the first buffer to start with.
    buffers[0] = new WrappedBuffer(numOutputs, sizePerBuffer);
    numInitializedBuffers = 1;
    if (LOG.isDebugEnabled()) {
      LOG.debug(sourceDestNameTrimmed + ": " + "Initializing Buffer #" +
          numInitializedBuffers + " with size=" + sizePerBuffer);
    }
    currentBuffer = buffers[0];
    baos = new ByteArrayOutputStream();
    dos = new NonSyncDataOutputStream(baos);
    keySerializer.open(dos);
    valSerializer.open(dos);
    rfs = ((LocalFileSystem) FileSystem.getLocal(this.conf)).getRaw();

    int maxThreads = Math.max(2, numBuffers/2);
    //TODO: Make use of TezSharedExecutor later
    ExecutorService executor = new ThreadPoolExecutor(1, maxThreads,
        60L, TimeUnit.SECONDS,
        new SynchronousQueue<Runnable>(),
        new ThreadFactoryBuilder()
            .setDaemon(true)
            .setNameFormat(
                "UnorderedOutSpiller {" + TezUtilsInternal.cleanVertexName(
                    outputContext.getDestinationVertexName()) + "} #%d")
            .build()
    );
    // to restrict submission of more tasks than threads (e.g numBuffers > numThreads)
    // This is maxThreads - 1, to avoid race between callback thread releasing semaphore and the
    // thread calling tryAcquire.
    availableSlots = new Semaphore(maxThreads - 1, true);
    spillExecutor = MoreExecutors.listeningDecorator(executor);
    numRecordsPerPartition = new int[numPartitions];
    reportPartitionStats = ReportPartitionStats.fromString(
        conf.get(TezRuntimeConfiguration.TEZ_RUNTIME_REPORT_PARTITION_STATS,
        TezRuntimeConfiguration.TEZ_RUNTIME_REPORT_PARTITION_STATS_DEFAULT));
    sizePerPartition = (reportPartitionStats.isEnabled()) ?
        new long[numPartitions] : null;

    outputLargeRecordsCounter = outputContext.getCounters().findCounter(
        TaskCounter.OUTPUT_LARGE_RECORDS);

    indexFileSizeEstimate = numPartitions * Constants.MAP_OUTPUT_INDEX_RECORD_LENGTH;

    if (numPartitions == 1 && !pipelinedShuffle) {
      //special case, where in only one partition is available.
      skipBuffers = true;
      if (this.useCachedStream) {
        writer = new IFile.FileBackedInMemIFileWriter(keySerialization, valSerialization, rfs,
            outputFileHandler, keyClass, valClass, codec, outputRecordsCounter,
            outputRecordBytesCounter, dataViaEventsMaxSize);
      } else {
        finalOutPath = outputFileHandler.getOutputFileForWrite();
        writer = new IFile.Writer(keySerialization, valSerialization, rfs, finalOutPath, keyClass, valClass,
            codec, outputRecordsCounter, outputRecordBytesCounter);
        ensureSpillFilePermissions(finalOutPath, rfs);
      }
    } else {
      skipBuffers = false;
      writer = null;
    }
    LOG.info(sourceDestNameTrimmed + ": "
        + "numBuffers=" + numBuffers
        + ", sizePerBuffer=" + sizePerBuffer
        + ", skipBuffers=" + skipBuffers
        + ", numPartitions=" + numPartitions
        + ", availableMemory=" + availableMemory
        + ", maxSingleBufferSizeBytes=" + maxSingleBufferSizeBytes
        + ", pipelinedShuffle=" + pipelinedShuffle
        + ", isFinalMergeEnabled=" + isFinalMergeEnabled
        + ", numPartitions=" + numPartitions
        + ", reportPartitionStats=" + reportPartitionStats
        + ", dataViaEventsEnabled=" + dataViaEventsEnabled
        + ", dataViaEventsMaxSize=" + dataViaEventsMaxSize
        + ", useCachedStreamConfig=" + useCachedStreamConfig
        + ", useCachedStream=" + useCachedStream
    );
  }