in tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java [665:860]
public void flush() throws IOException {
final String uniqueIdentifier = outputContext.getUniqueIdentifier();
outputContext.notifyProgress();
/**
* Possible that the thread got interrupted when flush was happening or when the flush was
* never invoked. As a part of cleanup activity in TezTaskRunner, it would invoke close()
* on all I/O. At that time, this is safe to cleanup
*/
if (isThreadInterrupted()) {
return;
}
try {
LOG.info(outputContext.getInputOutputVertexNames() + ": Starting flush of map output");
span.end();
merger.add(span.sort(sorter));
// force a spill in flush()
// case 1: we want to force because of following scenarios:
// we have no keys written, and flush got called
// we want atleast one spill(be it empty)
// case 2: in pipeline shuffle case, we have no way of
// knowing the last key being written until flush is called
// so for flush()->spill() we want to force spill so that
// we can send pipeline shuffle event with last event true.
spill(false);
sortmaster.shutdown();
//safe to clean up
buffers.clear();
if(indexCacheList.isEmpty()) {
/*
* If we do not have this check, and if the task gets killed in the middle, it can throw
* NPE leading to distraction when debugging.
*/
if (LOG.isDebugEnabled()) {
LOG.debug(outputContext.getInputOutputVertexNames()
+ ": Index list is empty... returning");
}
return;
}
if (!isFinalMergeEnabled()) {
//For pipelined shuffle, previous events are already sent. Just generate the last event alone
int startIndex = (pipelinedShuffle) ? (numSpills - 1) : 0;
int endIndex = numSpills;
for (int i = startIndex; i < endIndex; i++) {
boolean isLastEvent = (i == numSpills - 1);
String pathComponent = (outputContext.getUniqueIdentifier() + "_" + i);
ShuffleUtils.generateEventOnSpill(finalEvents, isFinalMergeEnabled(), isLastEvent,
outputContext, i, indexCacheList.get(i), partitions,
sendEmptyPartitionDetails, pathComponent, partitionStats,
reportDetailedPartitionStats(), auxiliaryService, deflater);
LOG.info(outputContext.getInputOutputVertexNames() + ": Adding spill event for spill (final update="
+ isLastEvent + "), spillId=" + i);
}
return;
}
numAdditionalSpills.increment(numSpills - 1);
//In case final merge is required, the following code path is executed.
if (numSpills == 1) {
// someday be able to pass this directly to shuffle
// without writing to disk
final Path filename = spillFilePaths.get(0);
final Path indexFilename = spillFileIndexPaths.get(0);
finalOutputFile = mapOutputFile.getOutputFileForWriteInVolume(filename);
finalIndexFile = mapOutputFile.getOutputIndexFileForWriteInVolume(indexFilename);
sameVolRename(filename, finalOutputFile);
sameVolRename(indexFilename, finalIndexFile);
if (LOG.isDebugEnabled()) {
LOG.debug(outputContext.getInputOutputVertexNames() + ": numSpills=" + numSpills +
", finalOutputFile=" + finalOutputFile + ", "
+ "finalIndexFile=" + finalIndexFile + ", filename=" + filename + ", indexFilename=" +
indexFilename);
}
TezSpillRecord spillRecord = new TezSpillRecord(finalIndexFile, localFs);
if (reportPartitionStats()) {
for (int i = 0; i < spillRecord.size(); i++) {
partitionStats[i] += spillRecord.getIndex(i).getRawLength();
}
}
numShuffleChunks.setValue(numSpills);
fileOutputByteCounter.increment(rfs.getFileStatus(finalOutputFile).getLen());
// ??? why are events not being sent here?
return;
}
finalOutputFile =
mapOutputFile.getOutputFileForWrite(0); //TODO
finalIndexFile =
mapOutputFile.getOutputIndexFileForWrite(0); //TODO
if (LOG.isDebugEnabled()) {
LOG.debug(outputContext.getInputOutputVertexNames() + ": " +
"numSpills: " + numSpills + ", finalOutputFile:" + finalOutputFile + ", finalIndexFile:"
+ finalIndexFile);
}
//The output stream for the final single output file
FSDataOutputStream finalOut = rfs.create(finalOutputFile, true, 4096);
ensureSpillFilePermissions(finalOutputFile, rfs);
final TezSpillRecord spillRec = new TezSpillRecord(partitions);
for (int parts = 0; parts < partitions; parts++) {
boolean shouldWrite = false;
//create the segments to be merged
List<Segment> segmentList =
new ArrayList<Segment>(numSpills);
for (int i = 0; i < numSpills; i++) {
Path spillFilename = spillFilePaths.get(i);
TezIndexRecord indexRecord = indexCacheList.get(i).getIndex(parts);
if (indexRecord.hasData() || !sendEmptyPartitionDetails) {
shouldWrite = true;
DiskSegment s =
new DiskSegment(rfs, spillFilename, indexRecord.getStartOffset(),
indexRecord.getPartLength(), codec, ifileReadAhead,
ifileReadAheadLength, ifileBufferSize, true);
segmentList.add(s);
}
}
int mergeFactor =
this.conf.getInt(TezRuntimeConfiguration.TEZ_RUNTIME_IO_SORT_FACTOR,
TezRuntimeConfiguration.TEZ_RUNTIME_IO_SORT_FACTOR_DEFAULT);
// sort the segments only if there are intermediate merges
boolean sortSegments = segmentList.size() > mergeFactor;
//merge
TezRawKeyValueIterator kvIter = TezMerger.merge(conf, rfs,
serializationContext, codec,
segmentList, mergeFactor,
new Path(uniqueIdentifier),
(RawComparator) ConfigUtils.getIntermediateOutputKeyComparator(conf),
progressable, sortSegments, true,
null, spilledRecordsCounter, additionalSpillBytesRead,
null, merger.needsRLE()); // Not using any Progress in TezMerger. Should just work.
//write merged output to disk
long segmentStart = finalOut.getPos();
long rawLength = 0;
long partLength = 0;
if (shouldWrite) {
Writer writer = new Writer(serializationContext.getKeySerialization(),
serializationContext.getValSerialization(), finalOut,
serializationContext.getKeyClass(), serializationContext.getValueClass(), codec,
spilledRecordsCounter, null, merger.needsRLE());
if (combiner == null || numSpills < minSpillsForCombine) {
TezMerger.writeFile(kvIter, writer, progressable,
TezRuntimeConfiguration.TEZ_RUNTIME_RECORDS_BEFORE_PROGRESS_DEFAULT);
} else {
runCombineProcessor(kvIter, writer);
}
//close
writer.close();
rawLength = writer.getRawLength();
partLength = writer.getCompressedLength();
}
outputBytesWithOverheadCounter.increment(rawLength);
// record offsets
final TezIndexRecord rec =
new TezIndexRecord(segmentStart, rawLength, partLength);
spillRec.putIndex(rec, parts);
if (reportPartitionStats()) {
partitionStats[parts] += rawLength;
}
}
numShuffleChunks.setValue(1); //final merge has happened.
fileOutputByteCounter.increment(rfs.getFileStatus(finalOutputFile).getLen());
spillRec.writeToFile(finalIndexFile, conf, localFs);
finalOut.close();
for (int i = 0; i < numSpills; i++) {
Path indexFilename = spillFileIndexPaths.get(i);
Path spillFilename = spillFilePaths.get(i);
rfs.delete(indexFilename, true);
rfs.delete(spillFilename, true);
}
spillFileIndexPaths.clear();
spillFilePaths.clear();
} catch(InterruptedException ie) {
if (cleanup) {
cleanup();
}
Thread.currentThread().interrupt();
throw new IOInterruptedException("Interrupted while closing Output", ie);
}
}