in tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/TezMerger.java [553:741]
TezRawKeyValueIterator merge(Class keyClass, Class valueClass,
int factor, int inMem, Path tmpDir,
TezCounter readsCounter,
TezCounter writesCounter,
TezCounter bytesReadCounter,
Progress mergePhase)
throws IOException {
LOG.info("Merging " + segments.size() + " sorted segments");
if (segments.size() == 0) {
LOG.info("Nothing to merge. Returning an empty iterator");
return new EmptyIterator();
}
/*
* If there are inMemory segments, then they come first in the segments
* list and then the sorted disk segments. Otherwise(if there are only
* disk segments), then they are sorted segments if there are more than
* factor segments in the segments list.
*/
int numSegments = segments.size();
int origFactor = factor;
int passNo = 1;
if (mergePhase != null) {
mergeProgress = mergePhase;
}
long totalBytes = computeBytesInMerges(factor, inMem);
if (totalBytes != 0) {
progPerByte = 1.0f / (float)totalBytes;
}
//create the MergeStreams from the sorted map created in the constructor
//and dump the final output to a file
do {
//get the factor for this pass of merge. We assume in-memory segments
//are the first entries in the segment list and that the pass factor
//doesn't apply to them
factor = getPassFactor(factor, passNo, numSegments - inMem);
if (1 == passNo) {
factor += inMem;
}
List<Segment> segmentsToMerge =
new ArrayList<Segment>();
int segmentsConsidered = 0;
int numSegmentsToConsider = factor;
long startBytes = 0; // starting bytes of segments of this merge
while (true) {
//extract the smallest 'factor' number of segments
//Call cleanup on the empty segments (no key/value data)
List<Segment> mStream =
getSegmentDescriptors(numSegmentsToConsider);
for (Segment segment : mStream) {
// Initialize the segment at the last possible moment;
// this helps in ensuring we don't use buffers until we need them
segment.init(readsCounter, bytesReadCounter);
long startPos = segment.getPosition();
boolean hasNext = segment.nextRawKey();
long endPos = segment.getPosition();
if (hasNext) {
startBytes += endPos - startPos;
segmentsToMerge.add(segment);
segmentsConsidered++;
}
else { // Empty segments. Can this be avoided altogether ?
segment.close();
numSegments--; //we ignore this segment for the merge
}
}
//if we have the desired number of segments
//or looked at all available segments, we break
if (segmentsConsidered == factor ||
segments.size() == 0) {
break;
}
// Get the correct # of segments in case some of them were empty.
numSegmentsToConsider = factor - segmentsConsidered;
}
//feed the streams to the priority queue
initialize(segmentsToMerge.size());
clear();
for (Segment segment : segmentsToMerge) {
put(segment);
}
//if we have lesser number of segments remaining, then just return the
//iterator, else do another single level merge
if (numSegments <= factor) { // Will always kick in if only in-mem segments are provided.
if (!considerFinalMergeForProgress) { // for reduce task
// Reset totalBytesProcessed and recalculate totalBytes from the
// remaining segments to track the progress of the final merge.
// Final merge is considered as the progress of the reducePhase,
// the 3rd phase of reduce task.
totalBytesProcessed = 0;
totalBytes = 0;
for (int i = 0; i < segmentsToMerge.size(); i++) {
totalBytes += segmentsToMerge.get(i).getLength();
}
}
if (totalBytes != 0) //being paranoid
progPerByte = 1.0f / (float)totalBytes;
totalBytesProcessed += startBytes;
if (totalBytes != 0)
mergeProgress.set(totalBytesProcessed * progPerByte);
else
mergeProgress.set(1.0f); // Last pass and no segments left - we're done
LOG.info("Down to the last merge-pass, with " + numSegments +
" segments left of total size: " +
(totalBytes - totalBytesProcessed) + " bytes");
// At this point, Factor Segments have not been physically
// materialized. The merge will be done dynamically. Some of them may
// be in-memory segments, other on-disk semgnets. Decision to be made
// by a finalMerge is that is required.
return this;
} else {
LOG.info("Merging " + segmentsToMerge.size() +
" intermediate segments out of a total of " +
(segments.size()+segmentsToMerge.size()));
long bytesProcessedInPrevMerges = totalBytesProcessed;
totalBytesProcessed += startBytes;
//we want to spread the creation of temp files on multiple disks if
//available under the space constraints
long approxOutputSize = 0;
for (Segment s : segmentsToMerge) {
approxOutputSize += s.getLength() +
ChecksumFileSystem.getApproxChkSumLength(
s.getLength());
}
Path tmpFilename =
new Path(tmpDir, "intermediate").suffix("." + passNo);
Path outputFile = lDirAlloc.getLocalPathForWrite(
tmpFilename.toString(),
approxOutputSize, conf);
// TODO Would it ever make sense to make this an in-memory writer ?
// Merging because of too many disk segments - might fit in memory.
Writer writer =
new Writer(conf, fs, outputFile, keyClass, valueClass, codec,
writesCounter, null);
writeFile(this, writer, reporter, recordsBeforeProgress);
writer.close();
//we finished one single level merge; now clean up the priority
//queue
this.close();
// Add the newly create segment to the list of segments to be merged
Segment tempSegment =
new Segment(conf, fs, outputFile, codec, ifileReadAhead,
ifileReadAheadLength, ifileBufferSize, false);
// Insert new merged segment into the sorted list
int pos = Collections.binarySearch(segments, tempSegment,
segmentComparator);
if (pos < 0) {
// binary search failed. So position to be inserted at is -pos-1
pos = -pos-1;
}
segments.add(pos, tempSegment);
numSegments = segments.size();
// Subtract the difference between expected size of new segment and
// actual size of new segment(Expected size of new segment is
// inputBytesOfThisMerge) from totalBytes. Expected size and actual
// size will match(almost) if combiner is not called in merge.
long inputBytesOfThisMerge = totalBytesProcessed -
bytesProcessedInPrevMerges;
totalBytes -= inputBytesOfThisMerge - tempSegment.getLength();
if (totalBytes != 0) {
progPerByte = 1.0f / (float)totalBytes;
}
passNo++;
}
//we are worried about only the first pass merge factor. So reset the
//factor to what it originally was
factor = origFactor;
} while(true);
}