native/core/benches/parquet_read.rs (125 lines of code) (raw):

// Licensed to the Apache Software Foundation (ASF) under one // or more contributor license agreements. See the NOTICE file // distributed with this work for additional information // regarding copyright ownership. The ASF licenses this file // to you under the Apache License, Version 2.0 (the // "License"); you may not use this file except in compliance // with the License. You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, // software distributed under the License is distributed on an // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. mod perf; use std::sync::Arc; use arrow::{array::ArrayData, buffer::Buffer}; use comet::parquet::{read::ColumnReader, util::jni::TypePromotionInfo}; use criterion::{criterion_group, criterion_main, Criterion}; use parquet::{ basic::{Encoding, Type as PhysicalType}, column::page::{PageIterator, PageReader}, data_type::Int32Type, schema::types::{ ColumnDescPtr, ColumnDescriptor, ColumnPath, PrimitiveTypeBuilder, SchemaDescPtr, TypePtr, }, }; use comet::parquet::util::test_common::page_util::{ DataPageBuilder, DataPageBuilderImpl, InMemoryPageIterator, }; use perf::FlamegraphProfiler; use rand::{prelude::StdRng, Rng, SeedableRng}; use zstd::zstd_safe::WriteBuf; fn bench(c: &mut Criterion) { let expected_num_values: usize = NUM_PAGES * VALUES_PER_PAGE; let mut group = c.benchmark_group("comet_parquet_read"); let schema = build_test_schema(); let pages = build_plain_int32_pages(schema.column(0), 0.0); group.bench_function("INT/PLAIN/NOT_NULL", |b| { let t = TypePtr::new( PrimitiveTypeBuilder::new("f", PhysicalType::INT32) .with_length(4) .build() .unwrap(), ); b.iter(|| { let cd = ColumnDescriptor::new(t.clone(), 0, 0, ColumnPath::from(Vec::new())); let promotion_info = TypePromotionInfo::new(PhysicalType::INT32, -1, -1, 32); let mut column_reader = TestColumnReader::new( cd, promotion_info, BATCH_SIZE, pages.clone(), expected_num_values, ); let mut total = 0; for batch in column_reader.by_ref() { total += batch.len(); ::std::mem::forget(batch); } assert_eq!(total, expected_num_values); }); }); } fn profiled() -> Criterion { Criterion::default().with_profiler(FlamegraphProfiler::new(100)) } criterion_group! { name = benches; config = profiled(); targets = bench } criterion_main!(benches); fn build_test_schema() -> SchemaDescPtr { use parquet::schema::{parser::parse_message_type, types::SchemaDescriptor}; let message_type = " message test_schema { REQUIRED INT32 c1; OPTIONAL INT32 c2; } "; parse_message_type(message_type) .map(|t| Arc::new(SchemaDescriptor::new(Arc::new(t)))) .unwrap() } fn seedable_rng() -> StdRng { StdRng::seed_from_u64(42) } // test data params const NUM_PAGES: usize = 1000; const VALUES_PER_PAGE: usize = 10_000; const BATCH_SIZE: usize = 4096; fn build_plain_int32_pages( column_desc: ColumnDescPtr, null_density: f32, ) -> impl PageIterator + Clone { let max_def_level = column_desc.max_def_level(); let max_rep_level = column_desc.max_rep_level(); let rep_levels = vec![0; VALUES_PER_PAGE]; let mut rng = seedable_rng(); let mut pages: Vec<parquet::column::page::Page> = Vec::new(); let mut int32_value = 0; for _ in 0..NUM_PAGES { // generate page let mut values = Vec::with_capacity(VALUES_PER_PAGE); let mut def_levels = Vec::with_capacity(VALUES_PER_PAGE); for _ in 0..VALUES_PER_PAGE { let def_level = if rng.random::<f32>() < null_density { max_def_level - 1 } else { max_def_level }; if def_level == max_def_level { int32_value += 1; values.push(int32_value); } def_levels.push(def_level); } let mut page_builder = DataPageBuilderImpl::new(column_desc.clone(), values.len() as u32, true); page_builder.add_rep_levels(max_rep_level, &rep_levels); page_builder.add_def_levels(max_def_level, &def_levels); page_builder.add_values::<Int32Type>(Encoding::PLAIN, &values); pages.push(page_builder.consume()); } // Since `InMemoryPageReader` is not exposed from parquet crate, here we use // `InMemoryPageIterator` instead which is a Iter<Iter<Page>>. InMemoryPageIterator::new(vec![pages]) } struct TestColumnReader { inner: ColumnReader, pages: Box<dyn PageReader>, batch_size: usize, total_num_values: usize, total_num_values_read: usize, first_page_loaded: bool, } impl TestColumnReader { pub fn new( cd: ColumnDescriptor, promotion_info: TypePromotionInfo, batch_size: usize, mut page_iter: impl PageIterator + 'static, total_num_values: usize, ) -> Self { let reader = ColumnReader::get(cd, promotion_info, batch_size, false, false); let first = page_iter.next().unwrap().unwrap(); Self { inner: reader, pages: first, batch_size, total_num_values, total_num_values_read: 0, first_page_loaded: false, } } fn load_page(&mut self) { if let Some(page) = self.pages.get_next_page().unwrap() { let num_values = page.num_values() as usize; let buffer = Buffer::from_slice_ref(page.buffer().as_slice()); self.inner.set_page_v1(num_values, buffer, page.encoding()); } } } impl Iterator for TestColumnReader { type Item = ArrayData; fn next(&mut self) -> Option<Self::Item> { if self.total_num_values_read >= self.total_num_values { return None; } if !self.first_page_loaded { self.load_page(); self.first_page_loaded = true; } self.inner.reset_batch(); let total = ::std::cmp::min( self.batch_size, self.total_num_values - self.total_num_values_read, ); let mut left = total; while left > 0 { let (num_read, _) = self.inner.read_batch(left, 0); if num_read < left { self.load_page(); } left -= num_read; } self.total_num_values_read += total; Some(self.inner.current_batch().unwrap()) } }