native/core/benches/row_columnar.rs (80 lines of code) (raw):
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
use arrow::datatypes::DataType as ArrowDataType;
use comet::execution::shuffle::row::{
process_sorted_row_partition, SparkUnsafeObject, SparkUnsafeRow,
};
use comet::execution::shuffle::CompressionCodec;
use criterion::{criterion_group, criterion_main, Criterion};
use tempfile::Builder;
const NUM_ROWS: usize = 10000;
const BATCH_SIZE: usize = 5000;
const NUM_COLS: usize = 100;
const ROW_SIZE: usize = SparkUnsafeRow::get_row_bitset_width(NUM_COLS) + NUM_COLS * 8;
fn benchmark(c: &mut Criterion) {
let mut group = c.benchmark_group("row_array_conversion");
group.bench_function("row_to_array", |b| {
let spark_rows = (0..NUM_ROWS)
.map(|_| {
let mut spark_row = SparkUnsafeRow::new_with_num_fields(NUM_COLS);
let mut row = Row::new();
for i in SparkUnsafeRow::get_row_bitset_width(NUM_COLS)..ROW_SIZE {
row.data[i] = i as u8;
}
row.to_spark_row(&mut spark_row);
for i in 0..NUM_COLS {
spark_row.set_not_null_at(i);
}
spark_row
})
.collect::<Vec<_>>();
let mut row_addresses = spark_rows
.iter()
.map(|row| row.get_row_addr())
.collect::<Vec<_>>();
let mut row_sizes = spark_rows
.iter()
.map(|row| row.get_row_size())
.collect::<Vec<_>>();
let row_address_ptr = row_addresses.as_mut_ptr();
let row_size_ptr = row_sizes.as_mut_ptr();
let schema = vec![ArrowDataType::Int64; NUM_COLS];
b.iter(|| {
let tempfile = Builder::new().tempfile().unwrap();
process_sorted_row_partition(
NUM_ROWS,
BATCH_SIZE,
row_address_ptr,
row_size_ptr,
&schema,
tempfile.path().to_str().unwrap().to_string(),
1.0,
false,
0,
None,
&CompressionCodec::Zstd(1),
)
.unwrap();
});
});
}
struct Row {
data: Box<[u8; ROW_SIZE]>,
}
impl Row {
pub fn new() -> Self {
Row {
data: Box::new([0u8; ROW_SIZE]),
}
}
pub fn to_spark_row(&self, spark_row: &mut SparkUnsafeRow) {
spark_row.point_to_slice(self.data.as_ref());
}
}
fn config() -> Criterion {
Criterion::default()
}
criterion_group! {
name = benches;
config = config();
targets = benchmark
}
criterion_main!(benches);