private void mergeAll()

in tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/writers/UnorderedPartitionedKVWriter.java [509:591]


  private void mergeAll() throws IOException {
    long expectedSize = spilledSize;
    if (currentBuffer.nextPosition != 0) {
      expectedSize += currentBuffer.nextPosition - (currentBuffer.numRecords * META_SIZE)
          - currentBuffer.skipSize + numPartitions * APPROX_HEADER_LENGTH;
      // Update final statistics.
      updateGlobalStats(currentBuffer);
    }

    long indexFileSizeEstimate = numPartitions * Constants.MAP_OUTPUT_INDEX_RECORD_LENGTH;
    Path finalOutPath = outputFileHandler.getOutputFileForWrite(expectedSize);
    Path finalIndexPath = outputFileHandler.getOutputIndexFileForWrite(indexFileSizeEstimate);

    TezSpillRecord finalSpillRecord = new TezSpillRecord(numPartitions);

    DataInputBuffer keyBuffer = new DataInputBuffer();
    DataInputBuffer valBuffer = new DataInputBuffer();

    DataInputBuffer keyBufferIFile = new DataInputBuffer();
    DataInputBuffer valBufferIFile = new DataInputBuffer();

    FSDataOutputStream out = null;
    try {
      out = rfs.create(finalOutPath);
      Writer writer = null;

      for (int i = 0; i < numPartitions; i++) {
        long segmentStart = out.getPos();
        if (numRecordsPerPartition[i] == 0) {
          LOG.info("Skipping partition: " + i + " in final merge since it has no records");
          continue;
        }
        writer = new Writer(conf, out, keyClass, valClass, codec, null, null);
        try {
          if (currentBuffer.nextPosition != 0
              && currentBuffer.partitionPositions[i] != WrappedBuffer.PARTITION_ABSENT_POSITION) {
            // Write current buffer.
            writePartition(currentBuffer.partitionPositions[i], currentBuffer, writer, keyBuffer,
                valBuffer);
          }
          synchronized (spillInfoList) {
            for (SpillInfo spillInfo : spillInfoList) {
              TezIndexRecord indexRecord = spillInfo.spillRecord.getIndex(i);
              if (indexRecord.getPartLength() == 0) {
                // Skip empty partitions within a spill
                continue;
              }
              FSDataInputStream in = rfs.open(spillInfo.outPath);
              in.seek(indexRecord.getStartOffset());
              IFile.Reader reader = new IFile.Reader(in, indexRecord.getPartLength(), codec, null,
                  additionalSpillBytesReadCounter, ifileReadAhead, ifileReadAheadLength,
                  ifileBufferSize);
              while (reader.nextRawKey(keyBufferIFile)) {
                // TODO Inefficient. If spills are not compressed, a direct copy should be possible
                // given the current IFile format. Also exteremely inefficient for large records,
                // since the entire record will be read into memory.
                reader.nextRawValue(valBufferIFile);
                writer.append(keyBufferIFile, valBufferIFile);
              }
              reader.close();
            }
          }
          writer.close();
          fileOutputBytesCounter.increment(writer.getCompressedLength());
          TezIndexRecord indexRecord = new TezIndexRecord(segmentStart, writer.getRawLength(),
              writer.getCompressedLength());
          writer = null;
          finalSpillRecord.putIndex(indexRecord, i);
        } finally {
          if (writer != null) {
            writer.close();
          }
        }
      }
    } finally {
      if (out != null) {
        out.close();
      }
    }
    finalSpillRecord.writeToFile(finalIndexPath, conf);
    fileOutputBytesCounter.increment(indexFileSizeEstimate);
    LOG.info("Finished final spill after merging : " + numSpills.get() + " spills");
  }