client-spark/spark-3-columnar-shuffle/src/main/java/org/apache/spark/shuffle/celeborn/ColumnarHashBasedShuffleWriter.java [43:161]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
@Private
public class ColumnarHashBasedShuffleWriter<K, V, C> extends HashBasedShuffleWriter<K, V, C> {

  private static final Logger logger =
      LoggerFactory.getLogger(ColumnarHashBasedShuffleWriter.class);

  private final int stageId;
  private final int shuffleId;
  private final CelebornBatchBuilder[] celebornBatchBuilders;
  private final StructType schema;
  private final Serializer depSerializer;
  private final boolean isColumnarShuffle;
  private final int columnarShuffleBatchSize;
  private final boolean columnarShuffleCodeGenEnabled;
  private final boolean columnarShuffleDictionaryEnabled;
  private final double columnarShuffleDictionaryMaxFactor;

  public ColumnarHashBasedShuffleWriter(
      int shuffleId,
      CelebornShuffleHandle<K, V, C> handle,
      TaskContext taskContext,
      CelebornConf conf,
      ShuffleClient client,
      ShuffleWriteMetricsReporter metrics,
      SendBufferPool sendBufferPool)
      throws IOException {
    super(shuffleId, handle, taskContext, conf, client, metrics, sendBufferPool);
    columnarShuffleBatchSize = conf.columnarShuffleBatchSize();
    columnarShuffleCodeGenEnabled = conf.columnarShuffleCodeGenEnabled();
    columnarShuffleDictionaryEnabled = conf.columnarShuffleDictionaryEnabled();
    columnarShuffleDictionaryMaxFactor = conf.columnarShuffleDictionaryMaxFactor();
    ShuffleDependency<?, ?, ?> shuffleDependency = handle.dependency();
    this.stageId = taskContext.stageId();
    this.shuffleId = shuffleDependency.shuffleId();
    this.schema = CustomShuffleDependencyUtils.getSchema(shuffleDependency);
    this.depSerializer = handle.dependency().serializer();
    this.celebornBatchBuilders =
        new CelebornBatchBuilder[handle.dependency().partitioner().numPartitions()];
    this.isColumnarShuffle = schema != null && CelebornBatchBuilder.supportsColumnarType(schema);
  }

  @Override
  protected void fastWrite0(scala.collection.Iterator iterator)
      throws IOException, InterruptedException {
    if (isColumnarShuffle) {
      logger.info("Fast columnar write of columnar shuffle {} for stage {}.", shuffleId, stageId);
      fastColumnarWrite0(iterator);
    } else {
      super.fastWrite0(iterator);
    }
  }

  private void fastColumnarWrite0(scala.collection.Iterator iterator) throws IOException {
    final scala.collection.Iterator<Product2<Integer, UnsafeRow>> records = iterator;

    SQLMetric dataSize = SparkUtils.getDataSize((UnsafeRowSerializer) depSerializer);
    while (records.hasNext()) {
      final Product2<Integer, UnsafeRow> record = records.next();
      final int partitionId = record._1();
      final UnsafeRow row = record._2();

      if (celebornBatchBuilders[partitionId] == null) {
        CelebornBatchBuilder columnBuilders;
        if (columnarShuffleCodeGenEnabled && !columnarShuffleDictionaryEnabled) {
          columnBuilders =
              new CelebornColumnarBatchCodeGenBuild().create(schema, columnarShuffleBatchSize);
        } else {
          columnBuilders =
              new CelebornColumnarBatchBuilder(
                  schema,
                  columnarShuffleBatchSize,
                  columnarShuffleDictionaryMaxFactor,
                  columnarShuffleDictionaryEnabled);
        }
        columnBuilders.newBuilders();
        celebornBatchBuilders[partitionId] = columnBuilders;
      }

      celebornBatchBuilders[partitionId].writeRow(row);
      if (celebornBatchBuilders[partitionId].getRowCnt() >= columnarShuffleBatchSize) {
        byte[] arr = celebornBatchBuilders[partitionId].buildColumnBytes();
        pushGiantRecord(partitionId, arr, arr.length);
        if (dataSize != null) {
          dataSize.add(arr.length);
        }
        celebornBatchBuilders[partitionId].newBuilders();
      }
      tmpRecordsWritten++;
    }
  }

  @Override
  protected void closeWrite() throws IOException {
    if (canUseFastWrite() && isColumnarShuffle) {
      closeColumnarWrite();
    } else {
      super.closeWrite();
    }
  }

  private void closeColumnarWrite() throws IOException {
    SQLMetric dataSize = SparkUtils.getDataSize((UnsafeRowSerializer) depSerializer);
    for (int i = 0; i < celebornBatchBuilders.length; i++) {
      final CelebornBatchBuilder builders = celebornBatchBuilders[i];
      if (builders != null && builders.getRowCnt() > 0) {
        byte[] buffers = builders.buildColumnBytes();
        if (dataSize != null) {
          dataSize.add(buffers.length);
        }
        mergeData(i, buffers, 0, buffers.length);
        // free buffer
        celebornBatchBuilders[i] = null;
      }
    }
  }

  @VisibleForTesting
  public boolean isColumnarShuffle() {
    return isColumnarShuffle;
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



client-spark/spark-4-columnar-shuffle/src/main/java/org/apache/spark/shuffle/celeborn/ColumnarHashBasedShuffleWriter.java [43:161]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
@Private
public class ColumnarHashBasedShuffleWriter<K, V, C> extends HashBasedShuffleWriter<K, V, C> {

  private static final Logger logger =
      LoggerFactory.getLogger(ColumnarHashBasedShuffleWriter.class);

  private final int stageId;
  private final int shuffleId;
  private final CelebornBatchBuilder[] celebornBatchBuilders;
  private final StructType schema;
  private final Serializer depSerializer;
  private final boolean isColumnarShuffle;
  private final int columnarShuffleBatchSize;
  private final boolean columnarShuffleCodeGenEnabled;
  private final boolean columnarShuffleDictionaryEnabled;
  private final double columnarShuffleDictionaryMaxFactor;

  public ColumnarHashBasedShuffleWriter(
      int shuffleId,
      CelebornShuffleHandle<K, V, C> handle,
      TaskContext taskContext,
      CelebornConf conf,
      ShuffleClient client,
      ShuffleWriteMetricsReporter metrics,
      SendBufferPool sendBufferPool)
      throws IOException {
    super(shuffleId, handle, taskContext, conf, client, metrics, sendBufferPool);
    columnarShuffleBatchSize = conf.columnarShuffleBatchSize();
    columnarShuffleCodeGenEnabled = conf.columnarShuffleCodeGenEnabled();
    columnarShuffleDictionaryEnabled = conf.columnarShuffleDictionaryEnabled();
    columnarShuffleDictionaryMaxFactor = conf.columnarShuffleDictionaryMaxFactor();
    ShuffleDependency<?, ?, ?> shuffleDependency = handle.dependency();
    this.stageId = taskContext.stageId();
    this.shuffleId = shuffleDependency.shuffleId();
    this.schema = CustomShuffleDependencyUtils.getSchema(shuffleDependency);
    this.depSerializer = handle.dependency().serializer();
    this.celebornBatchBuilders =
        new CelebornBatchBuilder[handle.dependency().partitioner().numPartitions()];
    this.isColumnarShuffle = schema != null && CelebornBatchBuilder.supportsColumnarType(schema);
  }

  @Override
  protected void fastWrite0(scala.collection.Iterator iterator)
      throws IOException, InterruptedException {
    if (isColumnarShuffle) {
      logger.info("Fast columnar write of columnar shuffle {} for stage {}.", shuffleId, stageId);
      fastColumnarWrite0(iterator);
    } else {
      super.fastWrite0(iterator);
    }
  }

  private void fastColumnarWrite0(scala.collection.Iterator iterator) throws IOException {
    final scala.collection.Iterator<Product2<Integer, UnsafeRow>> records = iterator;

    SQLMetric dataSize = SparkUtils.getDataSize((UnsafeRowSerializer) depSerializer);
    while (records.hasNext()) {
      final Product2<Integer, UnsafeRow> record = records.next();
      final int partitionId = record._1();
      final UnsafeRow row = record._2();

      if (celebornBatchBuilders[partitionId] == null) {
        CelebornBatchBuilder columnBuilders;
        if (columnarShuffleCodeGenEnabled && !columnarShuffleDictionaryEnabled) {
          columnBuilders =
              new CelebornColumnarBatchCodeGenBuild().create(schema, columnarShuffleBatchSize);
        } else {
          columnBuilders =
              new CelebornColumnarBatchBuilder(
                  schema,
                  columnarShuffleBatchSize,
                  columnarShuffleDictionaryMaxFactor,
                  columnarShuffleDictionaryEnabled);
        }
        columnBuilders.newBuilders();
        celebornBatchBuilders[partitionId] = columnBuilders;
      }

      celebornBatchBuilders[partitionId].writeRow(row);
      if (celebornBatchBuilders[partitionId].getRowCnt() >= columnarShuffleBatchSize) {
        byte[] arr = celebornBatchBuilders[partitionId].buildColumnBytes();
        pushGiantRecord(partitionId, arr, arr.length);
        if (dataSize != null) {
          dataSize.add(arr.length);
        }
        celebornBatchBuilders[partitionId].newBuilders();
      }
      tmpRecordsWritten++;
    }
  }

  @Override
  protected void closeWrite() throws IOException {
    if (canUseFastWrite() && isColumnarShuffle) {
      closeColumnarWrite();
    } else {
      super.closeWrite();
    }
  }

  private void closeColumnarWrite() throws IOException {
    SQLMetric dataSize = SparkUtils.getDataSize((UnsafeRowSerializer) depSerializer);
    for (int i = 0; i < celebornBatchBuilders.length; i++) {
      final CelebornBatchBuilder builders = celebornBatchBuilders[i];
      if (builders != null && builders.getRowCnt() > 0) {
        byte[] buffers = builders.buildColumnBytes();
        if (dataSize != null) {
          dataSize.add(buffers.length);
        }
        mergeData(i, buffers, 0, buffers.length);
        // free buffer
        celebornBatchBuilders[i] = null;
      }
    }
  }

  @VisibleForTesting
  public boolean isColumnarShuffle() {
    return isColumnarShuffle;
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



