in tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/writers/UnorderedPartitionedKVWriter.java [751:872]
public List<Event> close() throws IOException, InterruptedException {
// In case there are buffers to be spilled, schedule spilling
scheduleSpill(true);
List<Event> eventList = Lists.newLinkedList();
isShutdown.set(true);
spillLock.lock();
try {
LOG.info(
sourceDestNameTrimmed + ": " + "Waiting for all spills to complete : Pending : " + pendingSpillCount.get());
while (pendingSpillCount.get() != 0 && spillException == null) {
spillInProgress.await();
}
} finally {
spillLock.unlock();
}
if (spillException != null) {
LOG.error(sourceDestNameTrimmed + ": " + "Error during spill, throwing");
// Assuming close will be called on the same thread as the write
cleanup();
currentBuffer.cleanup();
currentBuffer = null;
if (spillException instanceof IOException) {
throw (IOException) spillException;
} else {
throw new IOException(spillException);
}
} else {
LOG.info(sourceDestNameTrimmed + ": " + "All spills complete");
// Assuming close will be called on the same thread as the write
cleanup();
List<Event> events = Lists.newLinkedList();
if (!pipelinedShuffle) {
if (skipBuffers) {
writer.close();
long rawLen = writer.getRawLength();
long compLen = writer.getCompressedLength();
BitSet emptyPartitions = new BitSet();
if (outputRecordsCounter.getValue() == 0) {
emptyPartitions.set(0);
}
if (reportPartitionStats()) {
if (outputRecordsCounter.getValue() > 0) {
sizePerPartition[0] = rawLen;
}
}
cleanupCurrentBuffer();
if (outputRecordsCounter.getValue() > 0) {
outputBytesWithOverheadCounter.increment(rawLen);
fileOutputBytesCounter.increment(compLen + indexFileSizeEstimate);
}
eventList.add(generateVMEvent());
if (!canSendDataOverDME()) {
TezIndexRecord rec = new TezIndexRecord(0, rawLen, compLen);
TezSpillRecord sr = new TezSpillRecord(1);
sr.putIndex(rec, 0);
finalIndexPath = outputFileHandler.getOutputIndexFileForWrite(indexFileSizeEstimate);
sr.writeToFile(finalIndexPath, conf, localFs);
}
eventList.add(generateDMEvent(false, -1, false, outputContext
.getUniqueIdentifier(), emptyPartitions));
return eventList;
}
/*
1. Final merge enabled
- When lots of spills are there, mergeAll, generate events and return
- If there are no existing spills, check for final spill and generate events
2. Final merge disabled
- If finalSpill generated data, generate events and return
- If finalSpill did not generate data, it would automatically populate events
*/
if (isFinalMergeEnabled) {
if (numSpills.get() > 0) {
mergeAll();
} else {
finalSpill();
}
updateTezCountersAndNotify();
eventList.add(generateVMEvent());
eventList.add(generateDMEvent());
} else {
// if no data is generated, finalSpill would create VMEvent & add to finalEvents
SpillResult result = finalSpill();
if (result != null) {
updateTezCountersAndNotify();
// Generate vm event
finalEvents.add(generateVMEvent());
// compute empty partitions based on spill result and generate DME
int spillNum = numSpills.get() - 1;
SpillCallback callback = new SpillCallback(spillNum);
callback.computePartitionStats(result);
BitSet emptyPartitions = getEmptyPartitions(callback.getRecordsPerPartition());
String pathComponent = generatePathComponent(outputContext.getUniqueIdentifier(), spillNum);
Event finalEvent = generateDMEvent(true, spillNum,
true, pathComponent, emptyPartitions);
finalEvents.add(finalEvent);
}
//all events to be sent out are in finalEvents.
eventList.addAll(finalEvents);
}
cleanupCurrentBuffer();
return eventList;
}
//For pipelined case, send out an event in case finalspill generated a spill file.
if (finalSpill() != null) {
// VertexManagerEvent is only sent at the end and thus sizePerPartition is used
// for the sum of all spills.
mayBeSendEventsForSpill(currentBuffer.recordsPerPartition,
sizePerPartition, numSpills.get() - 1, true);
}
updateTezCountersAndNotify();
cleanupCurrentBuffer();
return events;
}
}