in spark/src/main/java/org/apache/spark/sql/comet/execution/shuffle/CometBypassMergeSortShuffleWriter.java [156:248]
public void write(Iterator<Product2<K, V>> records) throws IOException {
assert (partitionWriters == null);
ShuffleMapOutputWriter mapOutputWriter =
shuffleExecutorComponents.createMapOutputWriter(shuffleId, mapId, numPartitions);
try {
if (!records.hasNext()) {
partitionLengths =
mapOutputWriter
.commitAllPartitions(ShuffleChecksumHelper.EMPTY_CHECKSUM_VALUE)
.getPartitionLengths();
mapStatus =
MapStatus$.MODULE$.apply(blockManager.shuffleServerId(), partitionLengths, mapId);
return;
}
final long openStartTime = System.nanoTime();
partitionWriters = new CometDiskBlockWriter[numPartitions];
partitionWriterSegments = new FileSegment[numPartitions];
final String checksumAlgorithm = getChecksumAlgorithm(conf);
// Allocate the disk writers, and open the files that we'll be writing to
for (int i = 0; i < numPartitions; i++) {
final Tuple2<TempShuffleBlockId, File> tempShuffleBlockIdPlusFile =
blockManager.diskBlockManager().createTempShuffleBlock();
final File file = tempShuffleBlockIdPlusFile._2();
CometDiskBlockWriter writer =
new CometDiskBlockWriter(
file,
memoryManager,
taskContext,
serializer,
schema,
writeMetrics,
conf,
isAsync,
asyncThreadNum,
threadPool);
if (partitionChecksums.length > 0) {
writer.setChecksum(partitionChecksums[i]);
writer.setChecksumAlgo(checksumAlgorithm);
}
partitionWriters[i] = writer;
}
// Creating the file to write to and creating a disk writer both involve interacting with
// the disk, and can take a long time in aggregate when we open many files, so should be
// included in the shuffle write time.
writeMetrics.incWriteTime(System.nanoTime() - openStartTime);
long outputRows = 0;
while (records.hasNext()) {
outputRows += 1;
final Product2<K, V> record = records.next();
final K key = record._1();
// Safety: `CometBypassMergeSortShuffleWriter` is only used when dealing with Comet shuffle
// dependencies, which always produce `ColumnarBatch`es.
int partition_id = partitioner.getPartition(key);
partitionWriters[partitioner.getPartition(key)].insertRow(
(UnsafeRow) record._2(), partition_id);
}
long spillRecords = 0;
for (int i = 0; i < numPartitions; i++) {
CometDiskBlockWriter writer = partitionWriters[i];
partitionWriterSegments[i] = writer.close();
spillRecords += writer.getOutputRecords();
}
if (outputRows != spillRecords) {
throw new RuntimeException(
"outputRows("
+ outputRows
+ ") != spillRecords("
+ spillRecords
+ "). Please file a bug report.");
}
// TODO: We probably can move checksum generation here when concatenating partition files
partitionLengths = writePartitionedData(mapOutputWriter);
mapStatus = MapStatus$.MODULE$.apply(blockManager.shuffleServerId(), partitionLengths, mapId);
} catch (Exception e) {
try {
mapOutputWriter.abort(e);
} catch (Exception e2) {
logger.error("Failed to abort the writer after failing to write map output.", e2);
e.addSuppressed(e2);
}
throw e;
}
}