public void write()

in spark/src/main/java/org/apache/spark/sql/comet/execution/shuffle/CometBypassMergeSortShuffleWriter.java [156:248]


  public void write(Iterator<Product2<K, V>> records) throws IOException {
    assert (partitionWriters == null);
    ShuffleMapOutputWriter mapOutputWriter =
        shuffleExecutorComponents.createMapOutputWriter(shuffleId, mapId, numPartitions);
    try {
      if (!records.hasNext()) {
        partitionLengths =
            mapOutputWriter
                .commitAllPartitions(ShuffleChecksumHelper.EMPTY_CHECKSUM_VALUE)
                .getPartitionLengths();
        mapStatus =
            MapStatus$.MODULE$.apply(blockManager.shuffleServerId(), partitionLengths, mapId);
        return;
      }
      final long openStartTime = System.nanoTime();
      partitionWriters = new CometDiskBlockWriter[numPartitions];
      partitionWriterSegments = new FileSegment[numPartitions];

      final String checksumAlgorithm = getChecksumAlgorithm(conf);

      // Allocate the disk writers, and open the files that we'll be writing to
      for (int i = 0; i < numPartitions; i++) {
        final Tuple2<TempShuffleBlockId, File> tempShuffleBlockIdPlusFile =
            blockManager.diskBlockManager().createTempShuffleBlock();
        final File file = tempShuffleBlockIdPlusFile._2();
        CometDiskBlockWriter writer =
            new CometDiskBlockWriter(
                file,
                memoryManager,
                taskContext,
                serializer,
                schema,
                writeMetrics,
                conf,
                isAsync,
                asyncThreadNum,
                threadPool);
        if (partitionChecksums.length > 0) {
          writer.setChecksum(partitionChecksums[i]);
          writer.setChecksumAlgo(checksumAlgorithm);
        }
        partitionWriters[i] = writer;
      }
      // Creating the file to write to and creating a disk writer both involve interacting with
      // the disk, and can take a long time in aggregate when we open many files, so should be
      // included in the shuffle write time.
      writeMetrics.incWriteTime(System.nanoTime() - openStartTime);

      long outputRows = 0;

      while (records.hasNext()) {
        outputRows += 1;

        final Product2<K, V> record = records.next();
        final K key = record._1();
        // Safety: `CometBypassMergeSortShuffleWriter` is only used when dealing with Comet shuffle
        // dependencies, which always produce `ColumnarBatch`es.
        int partition_id = partitioner.getPartition(key);
        partitionWriters[partitioner.getPartition(key)].insertRow(
            (UnsafeRow) record._2(), partition_id);
      }

      long spillRecords = 0;

      for (int i = 0; i < numPartitions; i++) {
        CometDiskBlockWriter writer = partitionWriters[i];
        partitionWriterSegments[i] = writer.close();

        spillRecords += writer.getOutputRecords();
      }

      if (outputRows != spillRecords) {
        throw new RuntimeException(
            "outputRows("
                + outputRows
                + ") != spillRecords("
                + spillRecords
                + "). Please file a bug report.");
      }

      // TODO: We probably can move checksum generation here when concatenating partition files
      partitionLengths = writePartitionedData(mapOutputWriter);
      mapStatus = MapStatus$.MODULE$.apply(blockManager.shuffleServerId(), partitionLengths, mapId);
    } catch (Exception e) {
      try {
        mapOutputWriter.abort(e);
      } catch (Exception e2) {
        logger.error("Failed to abort the writer after failing to write map output.", e2);
        e.addSuppressed(e2);
      }
      throw e;
    }
  }