in tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/writers/UnorderedPartitionedKVWriter.java [509:591]
private void mergeAll() throws IOException {
long expectedSize = spilledSize;
if (currentBuffer.nextPosition != 0) {
expectedSize += currentBuffer.nextPosition - (currentBuffer.numRecords * META_SIZE)
- currentBuffer.skipSize + numPartitions * APPROX_HEADER_LENGTH;
// Update final statistics.
updateGlobalStats(currentBuffer);
}
long indexFileSizeEstimate = numPartitions * Constants.MAP_OUTPUT_INDEX_RECORD_LENGTH;
Path finalOutPath = outputFileHandler.getOutputFileForWrite(expectedSize);
Path finalIndexPath = outputFileHandler.getOutputIndexFileForWrite(indexFileSizeEstimate);
TezSpillRecord finalSpillRecord = new TezSpillRecord(numPartitions);
DataInputBuffer keyBuffer = new DataInputBuffer();
DataInputBuffer valBuffer = new DataInputBuffer();
DataInputBuffer keyBufferIFile = new DataInputBuffer();
DataInputBuffer valBufferIFile = new DataInputBuffer();
FSDataOutputStream out = null;
try {
out = rfs.create(finalOutPath);
Writer writer = null;
for (int i = 0; i < numPartitions; i++) {
long segmentStart = out.getPos();
if (numRecordsPerPartition[i] == 0) {
LOG.info("Skipping partition: " + i + " in final merge since it has no records");
continue;
}
writer = new Writer(conf, out, keyClass, valClass, codec, null, null);
try {
if (currentBuffer.nextPosition != 0
&& currentBuffer.partitionPositions[i] != WrappedBuffer.PARTITION_ABSENT_POSITION) {
// Write current buffer.
writePartition(currentBuffer.partitionPositions[i], currentBuffer, writer, keyBuffer,
valBuffer);
}
synchronized (spillInfoList) {
for (SpillInfo spillInfo : spillInfoList) {
TezIndexRecord indexRecord = spillInfo.spillRecord.getIndex(i);
if (indexRecord.getPartLength() == 0) {
// Skip empty partitions within a spill
continue;
}
FSDataInputStream in = rfs.open(spillInfo.outPath);
in.seek(indexRecord.getStartOffset());
IFile.Reader reader = new IFile.Reader(in, indexRecord.getPartLength(), codec, null,
additionalSpillBytesReadCounter, ifileReadAhead, ifileReadAheadLength,
ifileBufferSize);
while (reader.nextRawKey(keyBufferIFile)) {
// TODO Inefficient. If spills are not compressed, a direct copy should be possible
// given the current IFile format. Also exteremely inefficient for large records,
// since the entire record will be read into memory.
reader.nextRawValue(valBufferIFile);
writer.append(keyBufferIFile, valBufferIFile);
}
reader.close();
}
}
writer.close();
fileOutputBytesCounter.increment(writer.getCompressedLength());
TezIndexRecord indexRecord = new TezIndexRecord(segmentStart, writer.getRawLength(),
writer.getCompressedLength());
writer = null;
finalSpillRecord.putIndex(indexRecord, i);
} finally {
if (writer != null) {
writer.close();
}
}
}
} finally {
if (out != null) {
out.close();
}
}
finalSpillRecord.writeToFile(finalIndexPath, conf);
fileOutputBytesCounter.increment(indexFileSizeEstimate);
LOG.info("Finished final spill after merging : " + numSpills.get() + " spills");
}