in tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/dflt/DefaultSorter.java [882:991]
protected void spill(int mstart, int mend, long sameKeyCount, long totalKeysCount)
throws IOException, InterruptedException {
//approximate the length of the output file to be the length of the
//buffer + header lengths for the partitions
final long size = (bufend >= bufstart
? bufend - bufstart
: (bufvoid - bufend) + bufstart) +
partitions * APPROX_HEADER_LENGTH;
FSDataOutputStream out = null;
try {
// create spill file
final TezSpillRecord spillRec = new TezSpillRecord(partitions);
final Path filename =
mapOutputFile.getSpillFileForWrite(numSpills, size);
spillFilePaths.put(numSpills, filename);
out = rfs.create(filename);
ensureSpillFilePermissions(filename, rfs);
int spindex = mstart;
final InMemValBytes value = createInMemValBytes();
boolean rle = isRLENeeded(sameKeyCount, totalKeysCount);
for (int i = 0; i < partitions; ++i) {
IFile.Writer writer = null;
try {
long segmentStart = out.getPos();
if (spindex < mend && kvmeta.get(offsetFor(spindex) + PARTITION) == i
|| !sendEmptyPartitionDetails) {
writer = new Writer(serializationContext.getKeySerialization(),
serializationContext.getValSerialization(), out, serializationContext.getKeyClass(),
serializationContext.getValueClass(), codec, spilledRecordsCounter, null, rle);
}
if (combiner == null) {
// spill directly
DataInputBuffer key = new DataInputBuffer();
while (spindex < mend &&
kvmeta.get(offsetFor(spindex) + PARTITION) == i) {
final int kvoff = offsetFor(spindex);
int keystart = kvmeta.get(kvoff + KEYSTART);
int valstart = kvmeta.get(kvoff + VALSTART);
key.reset(kvbuffer, keystart, valstart - keystart);
getVBytesForOffset(kvoff, value);
writer.append(key, value);
++spindex;
}
} else {
int spstart = spindex;
while (spindex < mend &&
kvmeta.get(offsetFor(spindex)
+ PARTITION) == i) {
++spindex;
}
// Note: we would like to avoid the combiner if we've fewer
// than some threshold of records for a partition
if (spstart != spindex) {
TezRawKeyValueIterator kvIter =
new MRResultIterator(spstart, spindex);
if (LOG.isDebugEnabled()) {
LOG.debug(outputContext.getInputOutputVertexNames() + ": " + "Running combine processor");
}
runCombineProcessor(kvIter, writer);
}
}
long rawLength = 0;
long partLength = 0;
// close the writer
if (writer != null) {
writer.close();
rawLength = writer.getRawLength();
partLength = writer.getCompressedLength();
}
adjustSpillCounters(rawLength, partLength);
// record offsets
final TezIndexRecord rec =
new TezIndexRecord(segmentStart, rawLength, partLength);
spillRec.putIndex(rec, i);
if (!isFinalMergeEnabled() && reportPartitionStats() && writer != null) {
partitionStats[i] += rawLength;
}
writer = null;
} finally {
if (null != writer) writer.close();
}
}
if (totalIndexCacheMemory >= indexCacheMemoryLimit) {
// create spill index file
Path indexFilename =
mapOutputFile.getSpillIndexFileForWrite(numSpills, partitions
* MAP_OUTPUT_INDEX_RECORD_LENGTH);
spillFileIndexPaths.put(numSpills, indexFilename);
spillRec.writeToFile(indexFilename, conf, localFs);
} else {
indexCacheList.add(spillRec);
totalIndexCacheMemory +=
spillRec.size() * MAP_OUTPUT_INDEX_RECORD_LENGTH;
}
LOG.info(outputContext.getInputOutputVertexNames() + ": " + "Finished spill " + numSpills
+ " at " + filename.toString());
++numSpills;
if (!isFinalMergeEnabled()) {
numShuffleChunks.setValue(numSpills);
} else if (numSpills > 1) {
//Increment only when there was atleast one previous spill
numAdditionalSpills.increment(1);
}
} finally {
if (out != null) out.close();
}
}