in tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/MergeManager.java [1152:1318]
private TezRawKeyValueIterator finalMerge(Configuration job, FileSystem fs,
List<MapOutput> inMemoryMapOutputs,
List<FileChunk> onDiskMapOutputs
) throws IOException, InterruptedException {
logFinalMergeStart(inMemoryMapOutputs, onDiskMapOutputs);
StringBuilder finalMergeLog = new StringBuilder();
inputContext.notifyProgress();
// merge config params
SerializationContext serContext = new SerializationContext(job);
final Path tmpDir = new Path(inputContext.getUniqueIdentifier());
final RawComparator comparator =
(RawComparator)ConfigUtils.getIntermediateInputKeyComparator(job);
// segments required to vacate memory
List<Segment> memDiskSegments = new ArrayList<Segment>();
long inMemToDiskBytes = 0;
boolean mergePhaseFinished = false;
if (inMemoryMapOutputs.size() > 0) {
int srcTaskId = inMemoryMapOutputs.get(0).getAttemptIdentifier().getInputIdentifier();
inMemToDiskBytes = createInMemorySegments(inMemoryMapOutputs,
memDiskSegments,
this.postMergeMemLimit);
final int numMemDiskSegments = memDiskSegments.size();
if (numMemDiskSegments > 0 &&
ioSortFactor > onDiskMapOutputs.size()) {
// If we reach here, it implies that we have less than io.sort.factor
// disk segments and this will be incremented by 1 (result of the
// memory segments merge). Since this total would still be
// <= io.sort.factor, we will not do any more intermediate merges,
// the merge of all these disk segments would be directly fed to the
// reduce method
mergePhaseFinished = true;
// must spill to disk, but can't retain in-mem for intermediate merge
// Can not use spill id in final merge as it would clobber with other files, hence using
// Integer.MAX_VALUE
final Path outputPath =
mapOutputFile.getInputFileForWrite(srcTaskId, Integer.MAX_VALUE,
inMemToDiskBytes).suffix(Constants.MERGED_OUTPUT_PREFIX);
final TezRawKeyValueIterator rIter = TezMerger.merge(job, fs, serContext,
memDiskSegments, numMemDiskSegments, tmpDir, comparator, progressable,
spilledRecordsCounter, null, additionalBytesRead, null);
final Writer writer = new Writer(serContext.getKeySerialization(),
serContext.getValSerialization(), fs, outputPath, serContext.getKeyClass(),
serContext.getValueClass(), codec, null, null);
try {
TezMerger.writeFile(rIter, writer, progressable, TezRuntimeConfiguration.TEZ_RUNTIME_RECORDS_BEFORE_PROGRESS_DEFAULT);
} catch (IOException e) {
if (null != outputPath) {
try {
fs.delete(outputPath, true);
} catch (IOException ie) {
// NOTHING
}
}
throw e;
} finally {
if (null != writer) {
writer.close();
additionalBytesWritten.increment(writer.getCompressedLength());
}
}
final FileStatus fStatus = localFS.getFileStatus(outputPath);
// add to list of final disk outputs.
onDiskMapOutputs.add(new FileChunk(outputPath, 0, fStatus.getLen()));
if (LOG.isInfoEnabled()) {
finalMergeLog.append("MemMerged: " + numMemDiskSegments + ", " + inMemToDiskBytes);
if (LOG.isDebugEnabled()) {
LOG.debug("Merged " + numMemDiskSegments + "segments, size=" +
inMemToDiskBytes + " to " + outputPath);
}
}
inMemToDiskBytes = 0;
memDiskSegments.clear();
} else if (inMemToDiskBytes != 0) {
if (LOG.isInfoEnabled()) {
finalMergeLog.append("DelayedMemMerge: " + numMemDiskSegments + ", " + inMemToDiskBytes);
if (LOG.isDebugEnabled()) {
LOG.debug("Keeping " + numMemDiskSegments + " segments, " +
inMemToDiskBytes + " bytes in memory for " +
"intermediate, on-disk merge");
}
}
}
}
// segments on disk
List<Segment> diskSegments = new ArrayList<Segment>();
long onDiskBytes = inMemToDiskBytes;
FileChunk[] onDisk = onDiskMapOutputs.toArray(new FileChunk[onDiskMapOutputs.size()]);
for (FileChunk fileChunk : onDisk) {
final long fileLength = fileChunk.getLength();
onDiskBytes += fileLength;
if (LOG.isDebugEnabled()) {
LOG.debug("Disk file=" + fileChunk.getPath() + ", len=" + fileLength +
", isLocal=" +
fileChunk.isLocalFile());
}
final Path file = fileChunk.getPath();
TezCounter counter =
file.toString().endsWith(Constants.MERGED_OUTPUT_PREFIX) ? null : mergedMapOutputsCounter;
final long fileOffset = fileChunk.getOffset();
final boolean preserve = fileChunk.isLocalFile();
diskSegments.add(new DiskSegment(fs, file, fileOffset, fileLength, codec, ifileReadAhead,
ifileReadAheadLength, ifileBufferSize, preserve, counter));
}
if (LOG.isInfoEnabled()) {
finalMergeLog.append(". DiskSeg: " + onDisk.length + ", " + onDiskBytes);
if (LOG.isDebugEnabled()) {
LOG.debug("Merging " + onDisk.length + " files, " +
onDiskBytes + " bytes from disk");
}
}
Collections.sort(diskSegments, new Comparator<Segment>() {
public int compare(Segment o1, Segment o2) {
if (o1.getLength() == o2.getLength()) {
return 0;
}
return o1.getLength() < o2.getLength() ? -1 : 1;
}
});
// build final list of segments from merged backed by disk + in-mem
List<Segment> finalSegments = new ArrayList<Segment>();
long inMemBytes = createInMemorySegments(inMemoryMapOutputs,
finalSegments, 0);
if (LOG.isInfoEnabled()) {
finalMergeLog.append(". MemSeg: " + finalSegments.size() + ", " + inMemBytes);
if (LOG.isDebugEnabled()) {
LOG.debug("Merging " + finalSegments.size() + " segments, " +
inMemBytes + " bytes from memory into reduce");
}
}
if (0 != onDiskBytes) {
final int numInMemSegments = memDiskSegments.size();
diskSegments.addAll(0, memDiskSegments);
memDiskSegments.clear();
TezRawKeyValueIterator diskMerge = TezMerger.merge(
job, fs, serContext, codec, diskSegments,
ioSortFactor, numInMemSegments, tmpDir, comparator,
progressable, false, spilledRecordsCounter, null, additionalBytesRead, null);
diskSegments.clear();
if (0 == finalSegments.size()) {
return diskMerge;
}
finalSegments.add(new Segment(
new RawKVIteratorReader(diskMerge, onDiskBytes), null));
}
if (LOG.isInfoEnabled()) {
LOG.info(finalMergeLog.toString());
}
// This is doing nothing but creating an iterator over the segments.
return TezMerger.merge(job, fs, serContext, codec,
finalSegments, finalSegments.size(), tmpDir,
comparator, progressable, spilledRecordsCounter, null,
additionalBytesRead, null);
}