client-tez/src/main/java/org/apache/tez/runtime/library/common/sort/impl/RssSorter.java [178:230]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
    LOG.info("Initialized WriteBufferManager.");
  }

  @Override
  public void flush() throws IOException {
    bufferManager.waitSendFinished();
  }

  @Override
  public final void close() throws IOException {
    super.close();
    bufferManager.freeAllResources();
  }

  @Override
  public void write(Object key, Object value) throws IOException {
    try {
      collect(key, value, partitioner.getPartition(key, value, partitions));
    } catch (InterruptedException e) {
      throw new RssException(e);
    }
  }

  synchronized void collect(Object key, Object value, final int partition)
      throws IOException, InterruptedException {
    if (key.getClass() != keyClass) {
      throw new IOException(
          "Type mismatch in key from map: expected "
              + keyClass.getName()
              + ", received "
              + key.getClass().getName());
    }
    if (value.getClass() != valClass) {
      throw new IOException(
          "Type mismatch in value from map: expected "
              + valClass.getName()
              + ", received "
              + value.getClass().getName());
    }
    if (partition < 0 || partition >= partitions) {
      throw new IOException("Illegal partition for " + key + " (" + partition + ")");
    }

    bufferManager.addRecord(partition, key, value);
    numRecordsPerPartition[partition]++;
  }

  public int[] getNumRecordsPerPartition() {
    return numRecordsPerPartition;
  }

  private boolean isMemoryShuffleEnabled(String storageType) {
    return StorageType.withMemory(StorageType.valueOf(storageType));
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



client-tez/src/main/java/org/apache/tez/runtime/library/common/sort/impl/RssUnSorter.java [172:223]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
    LOG.info("Initialized WriteBufferManager.");
  }

  @Override
  public void flush() throws IOException {
    bufferManager.waitSendFinished();
  }

  @Override
  public final void close() throws IOException {
    super.close();
    bufferManager.freeAllResources();
  }

  @Override
  public void write(Object key, Object value) throws IOException {
    try {
      collect(key, value, partitioner.getPartition(key, value, partitions));
    } catch (InterruptedException e) {
      throw new RssException(e);
    }
  }

  synchronized void collect(Object key, Object value, final int partition)
      throws IOException, InterruptedException {
    if (key.getClass() != keyClass) {
      throw new IOException(
          "Type mismatch in key from map: expected "
              + keyClass.getName()
              + ", received "
              + key.getClass().getName());
    }
    if (value.getClass() != valClass) {
      throw new IOException(
          "Type mismatch in value from map: expected "
              + valClass.getName()
              + ", received "
              + value.getClass().getName());
    }
    if (partition < 0 || partition >= partitions) {
      throw new IOException("Illegal partition for " + key + " (" + partition + ")");
    }
    bufferManager.addRecord(partition, key, value);
    numRecordsPerPartition[partition]++;
  }

  public int[] getNumRecordsPerPartition() {
    return numRecordsPerPartition;
  }

  private boolean isMemoryShuffleEnabled(String storageType) {
    return StorageType.withMemory(StorageType.valueOf(storageType));
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



