client-spark/spark-3-columnar-shuffle/src/main/java/org/apache/spark/shuffle/celeborn/ColumnarHashBasedShuffleWriter.java [95:132]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
  private void fastColumnarWrite0(scala.collection.Iterator iterator) throws IOException {
    final scala.collection.Iterator<Product2<Integer, UnsafeRow>> records = iterator;

    SQLMetric dataSize = SparkUtils.getDataSize((UnsafeRowSerializer) depSerializer);
    while (records.hasNext()) {
      final Product2<Integer, UnsafeRow> record = records.next();
      final int partitionId = record._1();
      final UnsafeRow row = record._2();

      if (celebornBatchBuilders[partitionId] == null) {
        CelebornBatchBuilder columnBuilders;
        if (columnarShuffleCodeGenEnabled && !columnarShuffleDictionaryEnabled) {
          columnBuilders =
              new CelebornColumnarBatchCodeGenBuild().create(schema, columnarShuffleBatchSize);
        } else {
          columnBuilders =
              new CelebornColumnarBatchBuilder(
                  schema,
                  columnarShuffleBatchSize,
                  columnarShuffleDictionaryMaxFactor,
                  columnarShuffleDictionaryEnabled);
        }
        columnBuilders.newBuilders();
        celebornBatchBuilders[partitionId] = columnBuilders;
      }

      celebornBatchBuilders[partitionId].writeRow(row);
      if (celebornBatchBuilders[partitionId].getRowCnt() >= columnarShuffleBatchSize) {
        byte[] arr = celebornBatchBuilders[partitionId].buildColumnBytes();
        pushGiantRecord(partitionId, arr, arr.length);
        if (dataSize != null) {
          dataSize.add(arr.length);
        }
        celebornBatchBuilders[partitionId].newBuilders();
      }
      tmpRecordsWritten++;
    }
  }
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



client-spark/spark-4-columnar-shuffle/src/main/java/org/apache/spark/shuffle/celeborn/ColumnarHashBasedShuffleWriter.java [95:132]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
  private void fastColumnarWrite0(scala.collection.Iterator iterator) throws IOException {
    final scala.collection.Iterator<Product2<Integer, UnsafeRow>> records = iterator;

    SQLMetric dataSize = SparkUtils.getDataSize((UnsafeRowSerializer) depSerializer);
    while (records.hasNext()) {
      final Product2<Integer, UnsafeRow> record = records.next();
      final int partitionId = record._1();
      final UnsafeRow row = record._2();

      if (celebornBatchBuilders[partitionId] == null) {
        CelebornBatchBuilder columnBuilders;
        if (columnarShuffleCodeGenEnabled && !columnarShuffleDictionaryEnabled) {
          columnBuilders =
              new CelebornColumnarBatchCodeGenBuild().create(schema, columnarShuffleBatchSize);
        } else {
          columnBuilders =
              new CelebornColumnarBatchBuilder(
                  schema,
                  columnarShuffleBatchSize,
                  columnarShuffleDictionaryMaxFactor,
                  columnarShuffleDictionaryEnabled);
        }
        columnBuilders.newBuilders();
        celebornBatchBuilders[partitionId] = columnBuilders;
      }

      celebornBatchBuilders[partitionId].writeRow(row);
      if (celebornBatchBuilders[partitionId].getRowCnt() >= columnarShuffleBatchSize) {
        byte[] arr = celebornBatchBuilders[partitionId].buildColumnBytes();
        pushGiantRecord(partitionId, arr, arr.length);
        if (dataSize != null) {
          dataSize.add(arr.length);
        }
        celebornBatchBuilders[partitionId].newBuilders();
      }
      tmpRecordsWritten++;
    }
  }
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



client-spark/spark-3.5-columnar-shuffle/src/main/java/org/apache/spark/shuffle/celeborn/ColumnarHashBasedShuffleWriter.java [95:132]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
  private void fastColumnarWrite0(scala.collection.Iterator iterator) throws IOException {
    final scala.collection.Iterator<Product2<Integer, UnsafeRow>> records = iterator;

    SQLMetric dataSize = SparkUtils.getDataSize((UnsafeRowSerializer) depSerializer);
    while (records.hasNext()) {
      final Product2<Integer, UnsafeRow> record = records.next();
      final int partitionId = record._1();
      final UnsafeRow row = record._2();

      if (celebornBatchBuilders[partitionId] == null) {
        CelebornBatchBuilder columnBuilders;
        if (columnarShuffleCodeGenEnabled && !columnarShuffleDictionaryEnabled) {
          columnBuilders =
              new CelebornColumnarBatchCodeGenBuild().create(schema, columnarShuffleBatchSize);
        } else {
          columnBuilders =
              new CelebornColumnarBatchBuilder(
                  schema,
                  columnarShuffleBatchSize,
                  columnarShuffleDictionaryMaxFactor,
                  columnarShuffleDictionaryEnabled);
        }
        columnBuilders.newBuilders();
        celebornBatchBuilders[partitionId] = columnBuilders;
      }

      celebornBatchBuilders[partitionId].writeRow(row);
      if (celebornBatchBuilders[partitionId].getRowCnt() >= columnarShuffleBatchSize) {
        byte[] arr = celebornBatchBuilders[partitionId].buildColumnBytes();
        pushGiantRecord(partitionId, arr, arr.length);
        if (dataSize != null) {
          dataSize.add(arr.length);
        }
        celebornBatchBuilders[partitionId].newBuilders();
      }
      tmpRecordsWritten++;
    }
  }
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



