in tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java [310:411]
public void flush() throws IOException {
final String uniqueIdentifier = outputContext.getUniqueIdentifier();
Path finalOutputFile =
mapOutputFile.getOutputFileForWrite(0); //TODO
Path finalIndexFile =
mapOutputFile.getOutputIndexFileForWrite(0); //TODO
LOG.info("Starting flush of map output");
span.end();
merger.add(span.sort(sorter, comparator));
spill();
sortmaster.shutdown();
largeBuffer = null;
if(numSpills == 1) {
// someday be able to pass this directly to shuffle
// without writing to disk
final Path filename =
mapOutputFile.getSpillFile(0);
Path indexFilename =
mapOutputFile.getSpillIndexFile(0);
sameVolRename(filename, mapOutputFile.getOutputFileForWriteInVolume(filename));
sameVolRename(indexFilename, mapOutputFile.getOutputIndexFileForWriteInVolume(indexFilename));
return;
}
//The output stream for the final single output file
FSDataOutputStream finalOut = rfs.create(finalOutputFile, true, 4096);
final TezSpillRecord spillRec = new TezSpillRecord(partitions);
final ArrayList<TezSpillRecord> indexCacheList = new ArrayList<TezSpillRecord>();
for(int i = 0; i < numSpills; i++) {
// TODO: build this cache before
Path indexFilename = mapOutputFile.getSpillIndexFile(i);
TezSpillRecord spillIndex = new TezSpillRecord(indexFilename, conf);
indexCacheList.add(spillIndex);
}
for (int parts = 0; parts < partitions; parts++) {
//create the segments to be merged
List<Segment> segmentList =
new ArrayList<Segment>(numSpills);
for(int i = 0; i < numSpills; i++) {
Path spillFilename = mapOutputFile.getSpillFile(i);
TezIndexRecord indexRecord = indexCacheList.get(i).getIndex(parts);
Segment s =
new Segment(conf, rfs, spillFilename, indexRecord.getStartOffset(),
indexRecord.getPartLength(), codec, ifileReadAhead,
ifileReadAheadLength, ifileBufferSize, true);
segmentList.add(i, s);
}
int mergeFactor =
this.conf.getInt(TezJobConfig.TEZ_RUNTIME_IO_SORT_FACTOR,
TezJobConfig.TEZ_RUNTIME_IO_SORT_FACTOR_DEFAULT);
// sort the segments only if there are intermediate merges
boolean sortSegments = segmentList.size() > mergeFactor;
//merge
TezRawKeyValueIterator kvIter = TezMerger.merge(conf, rfs,
keyClass, valClass, codec,
segmentList, mergeFactor,
new Path(uniqueIdentifier),
(RawComparator)ConfigUtils.getIntermediateOutputKeyComparator(conf),
nullProgressable, sortSegments, true,
null, spilledRecordsCounter, null,
null); // Not using any Progress in TezMerger. Should just work.
//write merged output to disk
long segmentStart = finalOut.getPos();
Writer writer =
new Writer(conf, finalOut, keyClass, valClass, codec,
spilledRecordsCounter, null, merger.needsRLE());
if (combiner == null || numSpills < minSpillsForCombine) {
TezMerger.writeFile(kvIter, writer, nullProgressable, TezJobConfig.TEZ_RUNTIME_RECORDS_BEFORE_PROGRESS_DEFAULT);
} else {
runCombineProcessor(kvIter, writer);
}
//close
writer.close();
// record offsets
final TezIndexRecord rec =
new TezIndexRecord(
segmentStart,
writer.getRawLength(),
writer.getCompressedLength());
spillRec.putIndex(rec, parts);
}
spillRec.writeToFile(finalIndexFile, conf);
finalOut.close();
for(int i = 0; i < numSpills; i++) {
Path indexFilename = mapOutputFile.getSpillIndexFile(i);
Path spillFilename = mapOutputFile.getSpillFile(i);
rfs.delete(indexFilename,true);
rfs.delete(spillFilename,true);
}
}