in spark/src/main/java/org/apache/spark/shuffle/sort/CometShuffleExternalSorter.java [515:631]
public void writeSortedFileNative(boolean isLastFile) throws IOException {
// This call performs the actual sort.
long arrayAddr = this.sorterArray.getBaseOffset();
int pos = inMemSorter.numRecords();
nativeLib.sortRowPartitionsNative(arrayAddr, pos);
ShuffleInMemorySorter.ShuffleSorterIterator sortedRecords =
new ShuffleInMemorySorter.ShuffleSorterIterator(pos, this.sorterArray, 0);
// If there are no sorted records, so we don't need to create an empty spill file.
if (!sortedRecords.hasNext()) {
return;
}
final ShuffleWriteMetricsReporter writeMetricsToUse;
if (isLastFile) {
// We're writing the final non-spill file, so we _do_ want to count this as shuffle bytes.
writeMetricsToUse = writeMetrics;
} else {
// We're spilling, so bytes written should be counted towards spill rather than write.
// Create a dummy WriteMetrics object to absorb these metrics, since we don't want to count
// them towards shuffle bytes written.
writeMetricsToUse = new ShuffleWriteMetrics();
}
int currentPartition = -1;
final RowPartition rowPartition = new RowPartition(initialSize);
while (sortedRecords.hasNext()) {
sortedRecords.loadNext();
final int partition = sortedRecords.packedRecordPointer.getPartitionId();
assert (partition >= currentPartition);
if (partition != currentPartition) {
// Switch to the new partition
if (currentPartition != -1) {
if (partitionChecksums.length > 0) {
// If checksum is enabled, we need to update the checksum for the current partition.
setChecksum(partitionChecksums[currentPartition]);
setChecksumAlgo(checksumAlgorithm);
}
long written =
doSpilling(
dataTypes,
spillInfo.file,
rowPartition,
writeMetricsToUse,
preferDictionaryRatio,
compressionCodec,
compressionLevel);
spillInfo.partitionLengths[currentPartition] = written;
// Store the checksum for the current partition.
partitionChecksums[currentPartition] = getChecksum();
}
currentPartition = partition;
}
final long recordPointer = sortedRecords.packedRecordPointer.getRecordPointer();
final long recordOffsetInPage = allocator.getOffsetInPage(recordPointer);
// Note that we need to skip over record key (partition id)
// Note that we already use off-heap memory for serialized rows, so recordPage is always
// null.
int recordSizeInBytes = UnsafeAlignedOffset.getSize(null, recordOffsetInPage) - 4;
long recordReadPosition = recordOffsetInPage + uaoSize + 4; // skip over record length too
rowPartition.addRow(recordReadPosition, recordSizeInBytes);
}
if (currentPartition != -1) {
long written =
doSpilling(
dataTypes,
spillInfo.file,
rowPartition,
writeMetricsToUse,
preferDictionaryRatio,
compressionCodec,
compressionLevel);
spillInfo.partitionLengths[currentPartition] = written;
synchronized (spills) {
spills.add(spillInfo);
}
}
if (!isLastFile) { // i.e. this is a spill file
// The current semantics of `shuffleRecordsWritten` seem to be that it's updated when
// records
// are written to disk, not when they enter the shuffle sorting code. DiskBlockObjectWriter
// relies on its `recordWritten()` method being called in order to trigger periodic updates
// to
// `shuffleBytesWritten`. If we were to remove the `recordWritten()` call and increment that
// counter at a higher-level, then the in-progress metrics for records written and bytes
// written would get out of sync.
//
// When writing the last file, we pass `writeMetrics` directly to the DiskBlockObjectWriter;
// in all other cases, we pass in a dummy write metrics to capture metrics, then copy those
// metrics to the true write metrics here. The reason for performing this copying is so that
// we can avoid reporting spilled bytes as shuffle write bytes.
//
// Note that we intentionally ignore the value of `writeMetricsToUse.shuffleWriteTime()`.
// Consistent with ExternalSorter, we do not count this IO towards shuffle write time.
// SPARK-3577 tracks the spill time separately.
// This is guaranteed to be a ShuffleWriteMetrics based on the if check in the beginning
// of this method.
synchronized (writeMetrics) {
writeMetrics.incRecordsWritten(
((ShuffleWriteMetrics) writeMetricsToUse).recordsWritten());
taskContext
.taskMetrics()
.incDiskBytesSpilled(((ShuffleWriteMetrics) writeMetricsToUse).bytesWritten());
}
}
}