public void writeSortedFileNative()

in spark/src/main/java/org/apache/spark/shuffle/sort/CometShuffleExternalSorter.java [515:631]


    public void writeSortedFileNative(boolean isLastFile) throws IOException {
      // This call performs the actual sort.
      long arrayAddr = this.sorterArray.getBaseOffset();
      int pos = inMemSorter.numRecords();
      nativeLib.sortRowPartitionsNative(arrayAddr, pos);
      ShuffleInMemorySorter.ShuffleSorterIterator sortedRecords =
          new ShuffleInMemorySorter.ShuffleSorterIterator(pos, this.sorterArray, 0);

      // If there are no sorted records, so we don't need to create an empty spill file.
      if (!sortedRecords.hasNext()) {
        return;
      }

      final ShuffleWriteMetricsReporter writeMetricsToUse;

      if (isLastFile) {
        // We're writing the final non-spill file, so we _do_ want to count this as shuffle bytes.
        writeMetricsToUse = writeMetrics;
      } else {
        // We're spilling, so bytes written should be counted towards spill rather than write.
        // Create a dummy WriteMetrics object to absorb these metrics, since we don't want to count
        // them towards shuffle bytes written.
        writeMetricsToUse = new ShuffleWriteMetrics();
      }

      int currentPartition = -1;

      final RowPartition rowPartition = new RowPartition(initialSize);

      while (sortedRecords.hasNext()) {
        sortedRecords.loadNext();
        final int partition = sortedRecords.packedRecordPointer.getPartitionId();
        assert (partition >= currentPartition);
        if (partition != currentPartition) {
          // Switch to the new partition
          if (currentPartition != -1) {

            if (partitionChecksums.length > 0) {
              // If checksum is enabled, we need to update the checksum for the current partition.
              setChecksum(partitionChecksums[currentPartition]);
              setChecksumAlgo(checksumAlgorithm);
            }

            long written =
                doSpilling(
                    dataTypes,
                    spillInfo.file,
                    rowPartition,
                    writeMetricsToUse,
                    preferDictionaryRatio,
                    compressionCodec,
                    compressionLevel);
            spillInfo.partitionLengths[currentPartition] = written;

            // Store the checksum for the current partition.
            partitionChecksums[currentPartition] = getChecksum();
          }
          currentPartition = partition;
        }

        final long recordPointer = sortedRecords.packedRecordPointer.getRecordPointer();
        final long recordOffsetInPage = allocator.getOffsetInPage(recordPointer);
        // Note that we need to skip over record key (partition id)
        // Note that we already use off-heap memory for serialized rows, so recordPage is always
        // null.
        int recordSizeInBytes = UnsafeAlignedOffset.getSize(null, recordOffsetInPage) - 4;
        long recordReadPosition = recordOffsetInPage + uaoSize + 4; // skip over record length too
        rowPartition.addRow(recordReadPosition, recordSizeInBytes);
      }

      if (currentPartition != -1) {
        long written =
            doSpilling(
                dataTypes,
                spillInfo.file,
                rowPartition,
                writeMetricsToUse,
                preferDictionaryRatio,
                compressionCodec,
                compressionLevel);
        spillInfo.partitionLengths[currentPartition] = written;

        synchronized (spills) {
          spills.add(spillInfo);
        }
      }

      if (!isLastFile) { // i.e. this is a spill file
        // The current semantics of `shuffleRecordsWritten` seem to be that it's updated when
        // records
        // are written to disk, not when they enter the shuffle sorting code. DiskBlockObjectWriter
        // relies on its `recordWritten()` method being called in order to trigger periodic updates
        // to
        // `shuffleBytesWritten`. If we were to remove the `recordWritten()` call and increment that
        // counter at a higher-level, then the in-progress metrics for records written and bytes
        // written would get out of sync.
        //
        // When writing the last file, we pass `writeMetrics` directly to the DiskBlockObjectWriter;
        // in all other cases, we pass in a dummy write metrics to capture metrics, then copy those
        // metrics to the true write metrics here. The reason for performing this copying is so that
        // we can avoid reporting spilled bytes as shuffle write bytes.
        //
        // Note that we intentionally ignore the value of `writeMetricsToUse.shuffleWriteTime()`.
        // Consistent with ExternalSorter, we do not count this IO towards shuffle write time.
        // SPARK-3577 tracks the spill time separately.

        // This is guaranteed to be a ShuffleWriteMetrics based on the if check in the beginning
        // of this method.
        synchronized (writeMetrics) {
          writeMetrics.incRecordsWritten(
              ((ShuffleWriteMetrics) writeMetricsToUse).recordsWritten());
          taskContext
              .taskMetrics()
              .incDiskBytesSpilled(((ShuffleWriteMetrics) writeMetricsToUse).bytesWritten());
        }
      }
    }