parquet/benches/arrow_reader.rs (467 lines of code) (raw):
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
use arrow::array::Array;
use arrow::datatypes::DataType;
use arrow_schema::Field;
use criterion::measurement::WallTime;
use criterion::{criterion_group, criterion_main, BenchmarkGroup, Criterion};
use num::FromPrimitive;
use num_bigint::BigInt;
use parquet::arrow::array_reader::{
make_byte_array_reader, make_fixed_len_byte_array_reader, ListArrayReader,
};
use parquet::basic::Type;
use parquet::data_type::{ByteArray, FixedLenByteArrayType};
use parquet::util::{DataPageBuilder, DataPageBuilderImpl, InMemoryPageIterator};
use parquet::{
arrow::array_reader::ArrayReader,
basic::Encoding,
column::page::PageIterator,
data_type::{ByteArrayType, Int32Type, Int64Type},
schema::types::{ColumnDescPtr, SchemaDescPtr},
};
use rand::distributions::uniform::SampleUniform;
use rand::{rngs::StdRng, Rng, SeedableRng};
use std::{collections::VecDeque, sync::Arc};
fn build_test_schema() -> SchemaDescPtr {
use parquet::schema::{parser::parse_message_type, types::SchemaDescriptor};
let message_type = "
message test_schema {
REQUIRED INT32 mandatory_int32_leaf;
OPTIONAL INT32 optional_int32_leaf;
REQUIRED BYTE_ARRAY mandatory_string_leaf (UTF8);
OPTIONAL BYTE_ARRAY optional_string_leaf (UTF8);
REQUIRED INT64 mandatory_int64_leaf;
OPTIONAL INT64 optional_int64_leaf;
REQUIRED INT32 mandatory_decimal1_leaf (DECIMAL(8,2));
OPTIONAL INT32 optional_decimal1_leaf (DECIMAL(8,2));
REQUIRED INT64 mandatory_decimal2_leaf (DECIMAL(16,2));
OPTIONAL INT64 optional_decimal2_leaf (DECIMAL(16,2));
REQUIRED BYTE_ARRAY mandatory_decimal3_leaf (DECIMAL(16,2));
OPTIONAL BYTE_ARRAY optional_decimal3_leaf (DECIMAL(16,2));
REQUIRED FIXED_LEN_BYTE_ARRAY (16) mandatory_decimal4_leaf (DECIMAL(16,2));
OPTIONAL FIXED_LEN_BYTE_ARRAY (16) optional_decimal4_leaf (DECIMAL(16,2));
OPTIONAL GROUP string_list (LIST) {
repeated group list {
optional BYTE_ARRAY element (UTF8);
}
}
}
";
parse_message_type(message_type)
.map(|t| Arc::new(SchemaDescriptor::new(Arc::new(t))))
.unwrap()
}
// test data params
const NUM_ROW_GROUPS: usize = 1;
const PAGES_PER_GROUP: usize = 2;
const VALUES_PER_PAGE: usize = 10_000;
const BATCH_SIZE: usize = 8192;
const MAX_LIST_LEN: usize = 10;
const EXPECTED_VALUE_COUNT: usize = NUM_ROW_GROUPS * PAGES_PER_GROUP * VALUES_PER_PAGE;
pub fn seedable_rng() -> StdRng {
StdRng::seed_from_u64(42)
}
// support byte array for decimal
fn build_encoded_decimal_bytes_page_iterator<T>(
column_desc: ColumnDescPtr,
null_density: f32,
encoding: Encoding,
min: i128,
max: i128,
) -> impl PageIterator + Clone
where
T: parquet::data_type::DataType,
T::T: From<Vec<u8>>,
{
let max_def_level = column_desc.max_def_level();
let max_rep_level = column_desc.max_rep_level();
let rep_levels = vec![0; VALUES_PER_PAGE];
let mut rng = seedable_rng();
let mut pages: Vec<Vec<parquet::column::page::Page>> = Vec::new();
for _i in 0..NUM_ROW_GROUPS {
let mut column_chunk_pages = Vec::new();
for _j in 0..PAGES_PER_GROUP {
// generate page
let mut values = Vec::with_capacity(VALUES_PER_PAGE);
let mut def_levels = Vec::with_capacity(VALUES_PER_PAGE);
for _k in 0..VALUES_PER_PAGE {
let def_level = if rng.gen::<f32>() < null_density {
max_def_level - 1
} else {
max_def_level
};
if def_level == max_def_level {
// create the decimal value
let value = rng.gen_range(min..max);
// decimal of parquet use the big-endian to store
let bytes = match column_desc.physical_type() {
Type::BYTE_ARRAY => {
// byte array use the unfixed size
let big_int = BigInt::from(value);
big_int.to_signed_bytes_be()
}
Type::FIXED_LEN_BYTE_ARRAY => {
assert_eq!(column_desc.type_length(), 16);
// fixed length byte array use the fixed size
// the size is 16
value.to_be_bytes().to_vec()
}
_ => unimplemented!(),
};
let value = T::T::from(bytes);
values.push(value);
}
def_levels.push(def_level);
}
let mut page_builder =
DataPageBuilderImpl::new(column_desc.clone(), values.len() as u32, true);
page_builder.add_rep_levels(max_rep_level, &rep_levels);
page_builder.add_def_levels(max_def_level, &def_levels);
page_builder.add_values::<T>(encoding, &values);
column_chunk_pages.push(page_builder.consume());
}
pages.push(column_chunk_pages);
}
InMemoryPageIterator::new(pages)
}
fn build_encoded_primitive_page_iterator<T>(
column_desc: ColumnDescPtr,
null_density: f32,
encoding: Encoding,
min: usize,
max: usize,
) -> impl PageIterator + Clone
where
T: parquet::data_type::DataType,
T::T: SampleUniform + FromPrimitive,
{
let max_def_level = column_desc.max_def_level();
let max_rep_level = column_desc.max_rep_level();
let rep_levels = vec![0; VALUES_PER_PAGE];
let mut rng = seedable_rng();
let mut pages: Vec<Vec<parquet::column::page::Page>> = Vec::new();
for _i in 0..NUM_ROW_GROUPS {
let mut column_chunk_pages = Vec::new();
for _j in 0..PAGES_PER_GROUP {
// generate page
let mut values = Vec::with_capacity(VALUES_PER_PAGE);
let mut def_levels = Vec::with_capacity(VALUES_PER_PAGE);
for _k in 0..VALUES_PER_PAGE {
let def_level = if rng.gen::<f32>() < null_density {
max_def_level - 1
} else {
max_def_level
};
if def_level == max_def_level {
let value =
FromPrimitive::from_usize(rng.gen_range(min..max)).unwrap();
values.push(value);
}
def_levels.push(def_level);
}
let mut page_builder =
DataPageBuilderImpl::new(column_desc.clone(), values.len() as u32, true);
page_builder.add_rep_levels(max_rep_level, &rep_levels);
page_builder.add_def_levels(max_def_level, &def_levels);
page_builder.add_values::<T>(encoding, &values);
column_chunk_pages.push(page_builder.consume());
}
pages.push(column_chunk_pages);
}
InMemoryPageIterator::new(pages)
}
fn build_dictionary_encoded_primitive_page_iterator<T>(
column_desc: ColumnDescPtr,
null_density: f32,
) -> impl PageIterator + Clone
where
T: parquet::data_type::DataType,
T::T: SampleUniform + FromPrimitive + Copy,
{
use parquet::encoding::{DictEncoder, Encoder};
let max_def_level = column_desc.max_def_level();
let max_rep_level = column_desc.max_rep_level();
let rep_levels = vec![0; VALUES_PER_PAGE];
// generate 1% unique values
const NUM_UNIQUE_VALUES: usize = VALUES_PER_PAGE / 100;
let unique_values: Vec<T::T> = (0..NUM_UNIQUE_VALUES)
.map(|x| FromPrimitive::from_usize(x + 1).unwrap())
.collect::<Vec<_>>();
let mut rng = seedable_rng();
let mut pages: Vec<Vec<parquet::column::page::Page>> = Vec::new();
for _i in 0..NUM_ROW_GROUPS {
let mut column_chunk_pages = VecDeque::new();
let mut dict_encoder = DictEncoder::<T>::new(column_desc.clone());
// add data pages
for _j in 0..PAGES_PER_GROUP {
// generate page
let mut values = Vec::with_capacity(VALUES_PER_PAGE);
let mut def_levels = Vec::with_capacity(VALUES_PER_PAGE);
for _k in 0..VALUES_PER_PAGE {
let def_level = if rng.gen::<f32>() < null_density {
max_def_level - 1
} else {
max_def_level
};
if def_level == max_def_level {
// select random value from list of unique values
let value = unique_values[rng.gen_range(0..NUM_UNIQUE_VALUES)];
values.push(value);
}
def_levels.push(def_level);
}
let mut page_builder =
DataPageBuilderImpl::new(column_desc.clone(), values.len() as u32, true);
page_builder.add_rep_levels(max_rep_level, &rep_levels);
page_builder.add_def_levels(max_def_level, &def_levels);
let _ = dict_encoder.put(&values);
let indices = dict_encoder
.write_indices()
.expect("write_indices() should be OK");
page_builder.add_indices(indices);
column_chunk_pages.push_back(page_builder.consume());
}
// add dictionary page
let dict = dict_encoder
.write_dict()
.expect("write_dict() should be OK");
let dict_page = parquet::column::page::Page::DictionaryPage {
buf: dict,
num_values: dict_encoder.num_entries() as u32,
encoding: Encoding::RLE_DICTIONARY,
is_sorted: false,
};
column_chunk_pages.push_front(dict_page);
pages.push(column_chunk_pages.into());
}
InMemoryPageIterator::new(pages)
}
fn build_plain_encoded_string_page_iterator(
column_desc: ColumnDescPtr,
null_density: f32,
) -> impl PageIterator + Clone {
let max_def_level = column_desc.max_def_level();
let max_rep_level = column_desc.max_rep_level();
let rep_levels = vec![0; VALUES_PER_PAGE];
let mut rng = seedable_rng();
let mut pages: Vec<Vec<parquet::column::page::Page>> = Vec::new();
for i in 0..NUM_ROW_GROUPS {
let mut column_chunk_pages = Vec::new();
for j in 0..PAGES_PER_GROUP {
// generate page
let mut values = Vec::with_capacity(VALUES_PER_PAGE);
let mut def_levels = Vec::with_capacity(VALUES_PER_PAGE);
for k in 0..VALUES_PER_PAGE {
let def_level = if rng.gen::<f32>() < null_density {
max_def_level - 1
} else {
max_def_level
};
if def_level == max_def_level {
let string_value =
format!("Test value {k}, row group: {i}, page: {j}");
values
.push(parquet::data_type::ByteArray::from(string_value.as_str()));
}
def_levels.push(def_level);
}
let mut page_builder =
DataPageBuilderImpl::new(column_desc.clone(), values.len() as u32, true);
page_builder.add_rep_levels(max_rep_level, &rep_levels);
page_builder.add_def_levels(max_def_level, &def_levels);
page_builder.add_values::<ByteArrayType>(Encoding::PLAIN, &values);
column_chunk_pages.push(page_builder.consume());
}
pages.push(column_chunk_pages);
}
InMemoryPageIterator::new(pages)
}
fn build_dictionary_encoded_string_page_iterator(
column_desc: ColumnDescPtr,
null_density: f32,
) -> impl PageIterator + Clone {
use parquet::encoding::{DictEncoder, Encoder};
let max_def_level = column_desc.max_def_level();
let max_rep_level = column_desc.max_rep_level();
let rep_levels = vec![0; VALUES_PER_PAGE];
// generate 1% unique values
const NUM_UNIQUE_VALUES: usize = VALUES_PER_PAGE / 100;
let unique_values = (0..NUM_UNIQUE_VALUES)
.map(|x| format!("Dictionary value {x}"))
.collect::<Vec<_>>();
let mut rng = seedable_rng();
let mut pages: Vec<Vec<parquet::column::page::Page>> = Vec::new();
for _i in 0..NUM_ROW_GROUPS {
let mut column_chunk_pages = VecDeque::new();
let mut dict_encoder = DictEncoder::<ByteArrayType>::new(column_desc.clone());
// add data pages
for _j in 0..PAGES_PER_GROUP {
// generate page
let mut values = Vec::with_capacity(VALUES_PER_PAGE);
let mut def_levels = Vec::with_capacity(VALUES_PER_PAGE);
for _k in 0..VALUES_PER_PAGE {
let def_level = if rng.gen::<f32>() < null_density {
max_def_level - 1
} else {
max_def_level
};
if def_level == max_def_level {
// select random value from list of unique values
let string_value =
unique_values[rng.gen_range(0..NUM_UNIQUE_VALUES)].as_str();
values.push(parquet::data_type::ByteArray::from(string_value));
}
def_levels.push(def_level);
}
let mut page_builder =
DataPageBuilderImpl::new(column_desc.clone(), values.len() as u32, true);
page_builder.add_rep_levels(max_rep_level, &rep_levels);
page_builder.add_def_levels(max_def_level, &def_levels);
let _ = dict_encoder.put(&values);
let indices = dict_encoder
.write_indices()
.expect("write_indices() should be OK");
page_builder.add_indices(indices);
column_chunk_pages.push_back(page_builder.consume());
}
// add dictionary page
let dict = dict_encoder
.write_dict()
.expect("write_dict() should be OK");
let dict_page = parquet::column::page::Page::DictionaryPage {
buf: dict,
num_values: dict_encoder.num_entries() as u32,
encoding: Encoding::RLE_DICTIONARY,
is_sorted: false,
};
column_chunk_pages.push_front(dict_page);
pages.push(column_chunk_pages.into());
}
InMemoryPageIterator::new(pages)
}
fn build_string_list_page_iterator(
column_desc: ColumnDescPtr,
null_density: f32,
) -> impl PageIterator + Clone {
let max_def_level = column_desc.max_def_level();
let max_rep_level = column_desc.max_rep_level();
assert_eq!(max_def_level, 3);
assert_eq!(max_rep_level, 1);
let mut rng = seedable_rng();
let mut pages: Vec<Vec<parquet::column::page::Page>> = Vec::new();
for i in 0..NUM_ROW_GROUPS {
let mut column_chunk_pages = Vec::new();
for j in 0..PAGES_PER_GROUP {
// generate page
let mut values: Vec<ByteArray> =
Vec::with_capacity(VALUES_PER_PAGE * MAX_LIST_LEN);
let mut def_levels = Vec::with_capacity(VALUES_PER_PAGE * MAX_LIST_LEN);
let mut rep_levels = Vec::with_capacity(VALUES_PER_PAGE * MAX_LIST_LEN);
for k in 0..VALUES_PER_PAGE {
rep_levels.push(0);
if rng.gen::<f32>() < null_density {
// Null list
def_levels.push(0);
continue;
}
let len = rng.gen_range(0..MAX_LIST_LEN);
if len == 0 {
// Empty list
def_levels.push(1);
continue;
}
(1..len).for_each(|_| rep_levels.push(1));
for l in 0..len {
if rng.gen::<f32>() < null_density {
// Null element
def_levels.push(2);
} else {
def_levels.push(3);
let value =
format!("Test value {k}[{l}], row group: {i}, page: {j}");
values.push(value.as_str().into());
}
}
}
let mut page_builder =
DataPageBuilderImpl::new(column_desc.clone(), values.len() as u32, true);
page_builder.add_rep_levels(max_rep_level, &rep_levels);
page_builder.add_def_levels(max_def_level, &def_levels);
page_builder.add_values::<ByteArrayType>(Encoding::PLAIN, &values);
column_chunk_pages.push(page_builder.consume());
}
pages.push(column_chunk_pages);
}
InMemoryPageIterator::new(pages)
}
fn bench_array_reader(mut array_reader: Box<dyn ArrayReader>) -> usize {
// test procedure: read data in batches of 8192 until no more data
let mut total_count = 0;
loop {
let array = array_reader.next_batch(BATCH_SIZE);
let array_len = array.unwrap().len();
total_count += array_len;
if array_len < BATCH_SIZE {
break;
}
}
total_count
}
fn bench_array_reader_skip(mut array_reader: Box<dyn ArrayReader>) -> usize {
// test procedure: read data in batches of 8192 until no more data
let mut total_count = 0;
let mut skip = false;
let mut array_len;
loop {
if skip {
array_len = array_reader.skip_records(BATCH_SIZE).unwrap();
} else {
let array = array_reader.next_batch(BATCH_SIZE);
array_len = array.unwrap().len();
}
total_count += array_len;
skip = !skip;
if array_len < BATCH_SIZE {
break;
}
}
total_count
}
fn create_primitive_array_reader(
page_iterator: impl PageIterator + 'static,
column_desc: ColumnDescPtr,
) -> Box<dyn ArrayReader> {
use parquet::arrow::array_reader::PrimitiveArrayReader;
match column_desc.physical_type() {
Type::INT32 => {
let reader = PrimitiveArrayReader::<Int32Type>::new(
Box::new(page_iterator),
column_desc,
None,
)
.unwrap();
Box::new(reader)
}
Type::INT64 => {
let reader = PrimitiveArrayReader::<Int64Type>::new(
Box::new(page_iterator),
column_desc,
None,
)
.unwrap();
Box::new(reader)
}
_ => unreachable!(),
}
}
fn create_decimal_by_bytes_reader(
page_iterator: impl PageIterator + 'static,
column_desc: ColumnDescPtr,
) -> Box<dyn ArrayReader> {
let physical_type = column_desc.physical_type();
match physical_type {
Type::BYTE_ARRAY => {
make_byte_array_reader(Box::new(page_iterator), column_desc, None).unwrap()
}
Type::FIXED_LEN_BYTE_ARRAY => {
make_fixed_len_byte_array_reader(Box::new(page_iterator), column_desc, None)
.unwrap()
}
_ => unimplemented!(),
}
}
fn create_string_byte_array_reader(
page_iterator: impl PageIterator + 'static,
column_desc: ColumnDescPtr,
) -> Box<dyn ArrayReader> {
make_byte_array_reader(Box::new(page_iterator), column_desc, None).unwrap()
}
fn create_string_byte_array_dictionary_reader(
page_iterator: impl PageIterator + 'static,
column_desc: ColumnDescPtr,
) -> Box<dyn ArrayReader> {
use parquet::arrow::array_reader::make_byte_array_dictionary_reader;
let arrow_type =
DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8));
make_byte_array_dictionary_reader(
Box::new(page_iterator),
column_desc,
Some(arrow_type),
)
.unwrap()
}
fn create_string_list_reader(
page_iterator: impl PageIterator + 'static,
column_desc: ColumnDescPtr,
) -> Box<dyn ArrayReader> {
let items = create_string_byte_array_reader(page_iterator, column_desc);
let field = Field::new("item", DataType::Utf8, true);
let data_type = DataType::List(Arc::new(field));
Box::new(ListArrayReader::<i32>::new(items, data_type, 2, 1, true))
}
fn bench_byte_decimal<T>(
group: &mut BenchmarkGroup<WallTime>,
mandatory_column_desc: &ColumnDescPtr,
optional_column_desc: &ColumnDescPtr,
min: i128,
max: i128,
) where
T: parquet::data_type::DataType,
T::T: From<Vec<u8>>,
{
// all are plain encoding
let mut count: usize = 0;
// plain encoded, no NULLs
let data = build_encoded_decimal_bytes_page_iterator::<T>(
mandatory_column_desc.clone(),
0.0,
Encoding::PLAIN,
min,
max,
);
group.bench_function("plain encoded, mandatory, no NULLs", |b| {
b.iter(|| {
let array_reader = create_decimal_by_bytes_reader(
data.clone(),
mandatory_column_desc.clone(),
);
count = bench_array_reader(array_reader);
});
assert_eq!(count, EXPECTED_VALUE_COUNT);
});
let data = build_encoded_decimal_bytes_page_iterator::<T>(
optional_column_desc.clone(),
0.0,
Encoding::PLAIN,
min,
max,
);
group.bench_function("plain encoded, optional, no NULLs", |b| {
b.iter(|| {
let array_reader = create_decimal_by_bytes_reader(
data.clone(),
optional_column_desc.clone(),
);
count = bench_array_reader(array_reader);
});
assert_eq!(count, EXPECTED_VALUE_COUNT);
});
// half null
let data = build_encoded_decimal_bytes_page_iterator::<T>(
optional_column_desc.clone(),
0.5,
Encoding::PLAIN,
min,
max,
);
group.bench_function("plain encoded, optional, half NULLs", |b| {
b.iter(|| {
let array_reader = create_decimal_by_bytes_reader(
data.clone(),
optional_column_desc.clone(),
);
count = bench_array_reader(array_reader);
});
assert_eq!(count, EXPECTED_VALUE_COUNT);
});
}
fn bench_primitive<T>(
group: &mut BenchmarkGroup<WallTime>,
mandatory_column_desc: &ColumnDescPtr,
optional_column_desc: &ColumnDescPtr,
min: usize,
max: usize,
) where
T: parquet::data_type::DataType,
T::T: SampleUniform + FromPrimitive + Copy,
{
let mut count: usize = 0;
// plain encoded, no NULLs
let data = build_encoded_primitive_page_iterator::<T>(
mandatory_column_desc.clone(),
0.0,
Encoding::PLAIN,
min,
max,
);
group.bench_function("plain encoded, mandatory, no NULLs", |b| {
b.iter(|| {
let array_reader = create_primitive_array_reader(
data.clone(),
mandatory_column_desc.clone(),
);
count = bench_array_reader(array_reader);
});
assert_eq!(count, EXPECTED_VALUE_COUNT);
});
let data = build_encoded_primitive_page_iterator::<T>(
optional_column_desc.clone(),
0.0,
Encoding::PLAIN,
min,
max,
);
group.bench_function("plain encoded, optional, no NULLs", |b| {
b.iter(|| {
let array_reader =
create_primitive_array_reader(data.clone(), optional_column_desc.clone());
count = bench_array_reader(array_reader);
});
assert_eq!(count, EXPECTED_VALUE_COUNT);
});
// plain encoded, half NULLs
let data = build_encoded_primitive_page_iterator::<T>(
optional_column_desc.clone(),
0.5,
Encoding::PLAIN,
min,
max,
);
group.bench_function("plain encoded, optional, half NULLs", |b| {
b.iter(|| {
let array_reader =
create_primitive_array_reader(data.clone(), optional_column_desc.clone());
count = bench_array_reader(array_reader);
});
assert_eq!(count, EXPECTED_VALUE_COUNT);
});
// binary packed, no NULLs
let data = build_encoded_primitive_page_iterator::<T>(
mandatory_column_desc.clone(),
0.0,
Encoding::DELTA_BINARY_PACKED,
min,
max,
);
group.bench_function("binary packed, mandatory, no NULLs", |b| {
b.iter(|| {
let array_reader = create_primitive_array_reader(
data.clone(),
mandatory_column_desc.clone(),
);
count = bench_array_reader(array_reader);
});
assert_eq!(count, EXPECTED_VALUE_COUNT);
});
let data = build_encoded_primitive_page_iterator::<T>(
optional_column_desc.clone(),
0.0,
Encoding::DELTA_BINARY_PACKED,
min,
max,
);
group.bench_function("binary packed, optional, no NULLs", |b| {
b.iter(|| {
let array_reader =
create_primitive_array_reader(data.clone(), optional_column_desc.clone());
count = bench_array_reader(array_reader);
});
assert_eq!(count, EXPECTED_VALUE_COUNT);
});
// binary packed skip , no NULLs
let data = build_encoded_primitive_page_iterator::<T>(
mandatory_column_desc.clone(),
0.0,
Encoding::DELTA_BINARY_PACKED,
min,
max,
);
group.bench_function("binary packed skip, mandatory, no NULLs", |b| {
b.iter(|| {
let array_reader = create_primitive_array_reader(
data.clone(),
mandatory_column_desc.clone(),
);
count = bench_array_reader_skip(array_reader);
});
assert_eq!(count, EXPECTED_VALUE_COUNT);
});
let data = build_encoded_primitive_page_iterator::<T>(
optional_column_desc.clone(),
0.0,
Encoding::DELTA_BINARY_PACKED,
min,
max,
);
group.bench_function("binary packed skip, optional, no NULLs", |b| {
b.iter(|| {
let array_reader =
create_primitive_array_reader(data.clone(), optional_column_desc.clone());
count = bench_array_reader_skip(array_reader);
});
assert_eq!(count, EXPECTED_VALUE_COUNT);
});
// binary packed, half NULLs
let data = build_encoded_primitive_page_iterator::<T>(
optional_column_desc.clone(),
0.5,
Encoding::DELTA_BINARY_PACKED,
min,
max,
);
group.bench_function("binary packed, optional, half NULLs", |b| {
b.iter(|| {
let array_reader =
create_primitive_array_reader(data.clone(), optional_column_desc.clone());
count = bench_array_reader(array_reader);
});
assert_eq!(count, EXPECTED_VALUE_COUNT);
});
// dictionary encoded, no NULLs
let data = build_dictionary_encoded_primitive_page_iterator::<T>(
mandatory_column_desc.clone(),
0.0,
);
group.bench_function("dictionary encoded, mandatory, no NULLs", |b| {
b.iter(|| {
let array_reader = create_primitive_array_reader(
data.clone(),
mandatory_column_desc.clone(),
);
count = bench_array_reader(array_reader);
});
assert_eq!(count, EXPECTED_VALUE_COUNT);
});
let data = build_dictionary_encoded_primitive_page_iterator::<T>(
optional_column_desc.clone(),
0.0,
);
group.bench_function("dictionary encoded, optional, no NULLs", |b| {
b.iter(|| {
let array_reader =
create_primitive_array_reader(data.clone(), optional_column_desc.clone());
count = bench_array_reader(array_reader);
});
assert_eq!(count, EXPECTED_VALUE_COUNT);
});
// dictionary encoded, half NULLs
let data = build_dictionary_encoded_primitive_page_iterator::<T>(
optional_column_desc.clone(),
0.5,
);
group.bench_function("dictionary encoded, optional, half NULLs", |b| {
b.iter(|| {
let array_reader =
create_primitive_array_reader(data.clone(), optional_column_desc.clone());
count = bench_array_reader(array_reader);
});
assert_eq!(count, EXPECTED_VALUE_COUNT);
});
}
fn decimal_benches(c: &mut Criterion) {
let schema = build_test_schema();
// parquet int32, logical type decimal(8,2)
let mandatory_decimal1_leaf_desc = schema.column(6);
let optional_decimal1_leaf_desc = schema.column(7);
let mut group = c.benchmark_group("arrow_array_reader/INT32/Decimal128Array");
bench_primitive::<Int32Type>(
&mut group,
&mandatory_decimal1_leaf_desc,
&optional_decimal1_leaf_desc,
// precision is 8: the max is 99999999
9999000,
9999999,
);
group.finish();
// parquet int64, logical type decimal(16,2)
let mut group = c.benchmark_group("arrow_array_reader/INT64/Decimal128Array");
let mandatory_decimal2_leaf_desc = schema.column(8);
let optional_decimal2_leaf_desc = schema.column(9);
bench_primitive::<Int64Type>(
&mut group,
&mandatory_decimal2_leaf_desc,
&optional_decimal2_leaf_desc,
// precision is 16: the max is 9999999999999999
9999999999999000,
9999999999999999,
);
group.finish();
// parquet BYTE_ARRAY, logical type decimal(16,2)
let mut group = c.benchmark_group("arrow_array_reader/BYTE_ARRAY/Decimal128Array");
let mandatory_decimal3_leaf_desc = schema.column(10);
let optional_decimal3_leaf_desc = schema.column(11);
bench_byte_decimal::<ByteArrayType>(
&mut group,
&mandatory_decimal3_leaf_desc,
&optional_decimal3_leaf_desc,
// precision is 16: the max is 9999999999999999
9999999999999000,
9999999999999999,
);
group.finish();
let mut group =
c.benchmark_group("arrow_array_reader/FIXED_LENGTH_BYTE_ARRAY/Decimal128Array");
let mandatory_decimal4_leaf_desc = schema.column(12);
let optional_decimal4_leaf_desc = schema.column(13);
bench_byte_decimal::<FixedLenByteArrayType>(
&mut group,
&mandatory_decimal4_leaf_desc,
&optional_decimal4_leaf_desc,
// precision is 16: the max is 9999999999999999
9999999999999000,
9999999999999999,
);
group.finish();
}
fn add_benches(c: &mut Criterion) {
let mut count: usize = 0;
let schema = build_test_schema();
let mandatory_int32_column_desc = schema.column(0);
let optional_int32_column_desc = schema.column(1);
let mandatory_string_column_desc = schema.column(2);
let optional_string_column_desc = schema.column(3);
let mandatory_int64_column_desc = schema.column(4);
let optional_int64_column_desc = schema.column(5);
let string_list_desc = schema.column(14);
// primitive / int32 benchmarks
// =============================
let mut group = c.benchmark_group("arrow_array_reader/Int32Array");
bench_primitive::<Int32Type>(
&mut group,
&mandatory_int32_column_desc,
&optional_int32_column_desc,
0,
1000,
);
group.finish();
// primitive / int64 benchmarks
// =============================
let mut group = c.benchmark_group("arrow_array_reader/Int64Array");
bench_primitive::<Int64Type>(
&mut group,
&mandatory_int64_column_desc,
&optional_int64_column_desc,
0,
1000,
);
group.finish();
// string benchmarks
//==============================
let mut group = c.benchmark_group("arrow_array_reader/StringArray");
// string, plain encoded, no NULLs
let plain_string_no_null_data = build_plain_encoded_string_page_iterator(
mandatory_string_column_desc.clone(),
0.0,
);
group.bench_function("plain encoded, mandatory, no NULLs", |b| {
b.iter(|| {
let array_reader = create_string_byte_array_reader(
plain_string_no_null_data.clone(),
mandatory_string_column_desc.clone(),
);
count = bench_array_reader(array_reader);
});
assert_eq!(count, EXPECTED_VALUE_COUNT);
});
let plain_string_no_null_data = build_plain_encoded_string_page_iterator(
optional_string_column_desc.clone(),
0.0,
);
group.bench_function("plain encoded, optional, no NULLs", |b| {
b.iter(|| {
let array_reader = create_string_byte_array_reader(
plain_string_no_null_data.clone(),
optional_string_column_desc.clone(),
);
count = bench_array_reader(array_reader);
});
assert_eq!(count, EXPECTED_VALUE_COUNT);
});
// string, plain encoded, half NULLs
let plain_string_half_null_data = build_plain_encoded_string_page_iterator(
optional_string_column_desc.clone(),
0.5,
);
group.bench_function("plain encoded, optional, half NULLs", |b| {
b.iter(|| {
let array_reader = create_string_byte_array_reader(
plain_string_half_null_data.clone(),
optional_string_column_desc.clone(),
);
count = bench_array_reader(array_reader);
});
assert_eq!(count, EXPECTED_VALUE_COUNT);
});
// string, dictionary encoded, no NULLs
let dictionary_string_no_null_data = build_dictionary_encoded_string_page_iterator(
mandatory_string_column_desc.clone(),
0.0,
);
group.bench_function("dictionary encoded, mandatory, no NULLs", |b| {
b.iter(|| {
let array_reader = create_string_byte_array_reader(
dictionary_string_no_null_data.clone(),
mandatory_string_column_desc.clone(),
);
count = bench_array_reader(array_reader);
});
assert_eq!(count, EXPECTED_VALUE_COUNT);
});
let dictionary_string_no_null_data = build_dictionary_encoded_string_page_iterator(
optional_string_column_desc.clone(),
0.0,
);
group.bench_function("dictionary encoded, optional, no NULLs", |b| {
b.iter(|| {
let array_reader = create_string_byte_array_reader(
dictionary_string_no_null_data.clone(),
optional_string_column_desc.clone(),
);
count = bench_array_reader(array_reader);
});
assert_eq!(count, EXPECTED_VALUE_COUNT);
});
// string, dictionary encoded, half NULLs
let dictionary_string_half_null_data = build_dictionary_encoded_string_page_iterator(
optional_string_column_desc.clone(),
0.5,
);
group.bench_function("dictionary encoded, optional, half NULLs", |b| {
b.iter(|| {
let array_reader = create_string_byte_array_reader(
dictionary_string_half_null_data.clone(),
optional_string_column_desc.clone(),
);
count = bench_array_reader(array_reader);
});
assert_eq!(count, EXPECTED_VALUE_COUNT);
});
group.finish();
// string dictionary benchmarks
//==============================
let mut group = c.benchmark_group("arrow_array_reader/StringDictionary");
group.bench_function("dictionary encoded, mandatory, no NULLs", |b| {
b.iter(|| {
let array_reader = create_string_byte_array_dictionary_reader(
dictionary_string_no_null_data.clone(),
mandatory_string_column_desc.clone(),
);
count = bench_array_reader(array_reader);
});
assert_eq!(count, EXPECTED_VALUE_COUNT);
});
group.bench_function("dictionary encoded, optional, no NULLs", |b| {
b.iter(|| {
let array_reader = create_string_byte_array_dictionary_reader(
dictionary_string_no_null_data.clone(),
optional_string_column_desc.clone(),
);
count = bench_array_reader(array_reader);
});
assert_eq!(count, EXPECTED_VALUE_COUNT);
});
group.bench_function("dictionary encoded, optional, half NULLs", |b| {
b.iter(|| {
let array_reader = create_string_byte_array_dictionary_reader(
dictionary_string_half_null_data.clone(),
optional_string_column_desc.clone(),
);
count = bench_array_reader(array_reader);
});
assert_eq!(count, EXPECTED_VALUE_COUNT);
});
group.finish();
// list benchmarks
//==============================
let list_data = build_string_list_page_iterator(string_list_desc.clone(), 0.);
let mut group = c.benchmark_group("arrow_array_reader/ListArray");
group.bench_function("plain encoded optional strings no NULLs", |b| {
b.iter(|| {
let reader =
create_string_list_reader(list_data.clone(), string_list_desc.clone());
count = bench_array_reader(reader);
});
assert_eq!(count, EXPECTED_VALUE_COUNT);
});
let list_data = build_string_list_page_iterator(string_list_desc.clone(), 0.5);
group.bench_function("plain encoded optional strings half NULLs", |b| {
b.iter(|| {
let reader =
create_string_list_reader(list_data.clone(), string_list_desc.clone());
count = bench_array_reader(reader);
});
assert_eq!(count, EXPECTED_VALUE_COUNT);
});
}
criterion_group!(benches, add_benches, decimal_benches,);
criterion_main!(benches);