parquet/src/file/serialized_reader.rs (2,221 lines of code) (raw):
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
//! Contains implementations of the reader traits FileReader, RowGroupReader and PageReader
//! Also contains implementations of the ChunkReader for files (with buffering) and byte arrays (RAM)
use crate::basic::{Encoding, Type};
use crate::bloom_filter::Sbbf;
use crate::column::page::{Page, PageMetadata, PageReader};
use crate::compression::{create_codec, Codec};
#[cfg(feature = "encryption")]
use crate::encryption::decrypt::{read_and_decrypt, CryptoContext};
use crate::errors::{ParquetError, Result};
use crate::file::page_index::offset_index::OffsetIndexMetaData;
use crate::file::{
metadata::*,
properties::{ReaderProperties, ReaderPropertiesPtr},
reader::*,
statistics,
};
use crate::format::{PageHeader, PageLocation, PageType};
use crate::record::reader::RowIter;
use crate::record::Row;
use crate::schema::types::Type as SchemaType;
use crate::thrift::{TCompactSliceInputProtocol, TSerializable};
use bytes::Bytes;
use std::collections::VecDeque;
use std::iter;
use std::{fs::File, io::Read, path::Path, sync::Arc};
use thrift::protocol::TCompactInputProtocol;
impl TryFrom<File> for SerializedFileReader<File> {
type Error = ParquetError;
fn try_from(file: File) -> Result<Self> {
Self::new(file)
}
}
impl TryFrom<&Path> for SerializedFileReader<File> {
type Error = ParquetError;
fn try_from(path: &Path) -> Result<Self> {
let file = File::open(path)?;
Self::try_from(file)
}
}
impl TryFrom<String> for SerializedFileReader<File> {
type Error = ParquetError;
fn try_from(path: String) -> Result<Self> {
Self::try_from(Path::new(&path))
}
}
impl TryFrom<&str> for SerializedFileReader<File> {
type Error = ParquetError;
fn try_from(path: &str) -> Result<Self> {
Self::try_from(Path::new(&path))
}
}
/// Conversion into a [`RowIter`]
/// using the full file schema over all row groups.
impl IntoIterator for SerializedFileReader<File> {
type Item = Result<Row>;
type IntoIter = RowIter<'static>;
fn into_iter(self) -> Self::IntoIter {
RowIter::from_file_into(Box::new(self))
}
}
// ----------------------------------------------------------------------
// Implementations of file & row group readers
/// A serialized implementation for Parquet [`FileReader`].
pub struct SerializedFileReader<R: ChunkReader> {
chunk_reader: Arc<R>,
metadata: Arc<ParquetMetaData>,
props: ReaderPropertiesPtr,
}
/// A predicate for filtering row groups, invoked with the metadata and index
/// of each row group in the file. Only row groups for which the predicate
/// evaluates to `true` will be scanned
pub type ReadGroupPredicate = Box<dyn FnMut(&RowGroupMetaData, usize) -> bool>;
/// A builder for [`ReadOptions`].
/// For the predicates that are added to the builder,
/// they will be chained using 'AND' to filter the row groups.
#[derive(Default)]
pub struct ReadOptionsBuilder {
predicates: Vec<ReadGroupPredicate>,
enable_page_index: bool,
props: Option<ReaderProperties>,
}
impl ReadOptionsBuilder {
/// New builder
pub fn new() -> Self {
Self::default()
}
/// Add a predicate on row group metadata to the reading option,
/// Filter only row groups that match the predicate criteria
pub fn with_predicate(mut self, predicate: ReadGroupPredicate) -> Self {
self.predicates.push(predicate);
self
}
/// Add a range predicate on filtering row groups if their midpoints are within
/// the Closed-Open range `[start..end) {x | start <= x < end}`
pub fn with_range(mut self, start: i64, end: i64) -> Self {
assert!(start < end);
let predicate = move |rg: &RowGroupMetaData, _: usize| {
let mid = get_midpoint_offset(rg);
mid >= start && mid < end
};
self.predicates.push(Box::new(predicate));
self
}
/// Enable reading the page index structures described in
/// "[Column Index] Layout to Support Page Skipping"
///
/// [Column Index]: https://github.com/apache/parquet-format/blob/master/PageIndex.md
pub fn with_page_index(mut self) -> Self {
self.enable_page_index = true;
self
}
/// Set the [`ReaderProperties`] configuration.
pub fn with_reader_properties(mut self, properties: ReaderProperties) -> Self {
self.props = Some(properties);
self
}
/// Seal the builder and return the read options
pub fn build(self) -> ReadOptions {
let props = self
.props
.unwrap_or_else(|| ReaderProperties::builder().build());
ReadOptions {
predicates: self.predicates,
enable_page_index: self.enable_page_index,
props,
}
}
}
/// A collection of options for reading a Parquet file.
///
/// Currently, only predicates on row group metadata are supported.
/// All predicates will be chained using 'AND' to filter the row groups.
pub struct ReadOptions {
predicates: Vec<ReadGroupPredicate>,
enable_page_index: bool,
props: ReaderProperties,
}
impl<R: 'static + ChunkReader> SerializedFileReader<R> {
/// Creates file reader from a Parquet file.
/// Returns error if Parquet file does not exist or is corrupt.
pub fn new(chunk_reader: R) -> Result<Self> {
let metadata = ParquetMetaDataReader::new().parse_and_finish(&chunk_reader)?;
let props = Arc::new(ReaderProperties::builder().build());
Ok(Self {
chunk_reader: Arc::new(chunk_reader),
metadata: Arc::new(metadata),
props,
})
}
/// Creates file reader from a Parquet file with read options.
/// Returns error if Parquet file does not exist or is corrupt.
pub fn new_with_options(chunk_reader: R, options: ReadOptions) -> Result<Self> {
let mut metadata_builder = ParquetMetaDataReader::new()
.parse_and_finish(&chunk_reader)?
.into_builder();
let mut predicates = options.predicates;
// Filter row groups based on the predicates
for (i, rg_meta) in metadata_builder.take_row_groups().into_iter().enumerate() {
let mut keep = true;
for predicate in &mut predicates {
if !predicate(&rg_meta, i) {
keep = false;
break;
}
}
if keep {
metadata_builder = metadata_builder.add_row_group(rg_meta);
}
}
let mut metadata = metadata_builder.build();
// If page indexes are desired, build them with the filtered set of row groups
if options.enable_page_index {
let mut reader =
ParquetMetaDataReader::new_with_metadata(metadata).with_page_indexes(true);
reader.read_page_indexes(&chunk_reader)?;
metadata = reader.finish()?;
}
Ok(Self {
chunk_reader: Arc::new(chunk_reader),
metadata: Arc::new(metadata),
props: Arc::new(options.props),
})
}
}
/// Get midpoint offset for a row group
fn get_midpoint_offset(meta: &RowGroupMetaData) -> i64 {
let col = meta.column(0);
let mut offset = col.data_page_offset();
if let Some(dic_offset) = col.dictionary_page_offset() {
if offset > dic_offset {
offset = dic_offset
}
};
offset + meta.compressed_size() / 2
}
impl<R: 'static + ChunkReader> FileReader for SerializedFileReader<R> {
fn metadata(&self) -> &ParquetMetaData {
&self.metadata
}
fn num_row_groups(&self) -> usize {
self.metadata.num_row_groups()
}
fn get_row_group(&self, i: usize) -> Result<Box<dyn RowGroupReader + '_>> {
let row_group_metadata = self.metadata.row_group(i);
// Row groups should be processed sequentially.
let props = Arc::clone(&self.props);
let f = Arc::clone(&self.chunk_reader);
Ok(Box::new(SerializedRowGroupReader::new(
f,
row_group_metadata,
self.metadata.offset_index().map(|x| x[i].as_slice()),
props,
)?))
}
fn get_row_iter(&self, projection: Option<SchemaType>) -> Result<RowIter> {
RowIter::from_file(projection, self)
}
}
/// A serialized implementation for Parquet [`RowGroupReader`].
pub struct SerializedRowGroupReader<'a, R: ChunkReader> {
chunk_reader: Arc<R>,
metadata: &'a RowGroupMetaData,
offset_index: Option<&'a [OffsetIndexMetaData]>,
props: ReaderPropertiesPtr,
bloom_filters: Vec<Option<Sbbf>>,
}
impl<'a, R: ChunkReader> SerializedRowGroupReader<'a, R> {
/// Creates new row group reader from a file, row group metadata and custom config.
pub fn new(
chunk_reader: Arc<R>,
metadata: &'a RowGroupMetaData,
offset_index: Option<&'a [OffsetIndexMetaData]>,
props: ReaderPropertiesPtr,
) -> Result<Self> {
let bloom_filters = if props.read_bloom_filter() {
metadata
.columns()
.iter()
.map(|col| Sbbf::read_from_column_chunk(col, &*chunk_reader))
.collect::<Result<Vec<_>>>()?
} else {
iter::repeat(None).take(metadata.columns().len()).collect()
};
Ok(Self {
chunk_reader,
metadata,
offset_index,
props,
bloom_filters,
})
}
}
impl<R: 'static + ChunkReader> RowGroupReader for SerializedRowGroupReader<'_, R> {
fn metadata(&self) -> &RowGroupMetaData {
self.metadata
}
fn num_columns(&self) -> usize {
self.metadata.num_columns()
}
// TODO: fix PARQUET-816
fn get_column_page_reader(&self, i: usize) -> Result<Box<dyn PageReader>> {
let col = self.metadata.column(i);
let page_locations = self.offset_index.map(|x| x[i].page_locations.clone());
let props = Arc::clone(&self.props);
Ok(Box::new(SerializedPageReader::new_with_properties(
Arc::clone(&self.chunk_reader),
col,
usize::try_from(self.metadata.num_rows())?,
page_locations,
props,
)?))
}
/// get bloom filter for the `i`th column
fn get_column_bloom_filter(&self, i: usize) -> Option<&Sbbf> {
self.bloom_filters[i].as_ref()
}
fn get_row_iter(&self, projection: Option<SchemaType>) -> Result<RowIter> {
RowIter::from_row_group(projection, self)
}
}
/// Reads a [`PageHeader`] from the provided [`Read`]
pub(crate) fn read_page_header<T: Read>(input: &mut T) -> Result<PageHeader> {
let mut prot = TCompactInputProtocol::new(input);
Ok(PageHeader::read_from_in_protocol(&mut prot)?)
}
#[cfg(feature = "encryption")]
pub(crate) fn read_encrypted_page_header<T: Read>(
input: &mut T,
crypto_context: Arc<CryptoContext>,
) -> Result<PageHeader> {
let data_decryptor = crypto_context.data_decryptor();
let aad = crypto_context.create_page_header_aad()?;
let buf = read_and_decrypt(data_decryptor, input, aad.as_ref()).map_err(|_| {
ParquetError::General(format!(
"Error decrypting column {}, decryptor may be wrong or missing",
crypto_context.column_ordinal
))
})?;
let mut prot = TCompactSliceInputProtocol::new(buf.as_slice());
Ok(PageHeader::read_from_in_protocol(&mut prot)?)
}
/// Reads a [`PageHeader`] from the provided [`Read`] returning the number of bytes read.
/// If the page header is encrypted [`CryptoContext`] must be provided.
#[cfg(feature = "encryption")]
fn read_encrypted_page_header_len<T: Read>(
input: &mut T,
crypto_context: Option<Arc<CryptoContext>>,
) -> Result<(usize, PageHeader)> {
/// A wrapper around a [`std::io::Read`] that keeps track of the bytes read
struct TrackedRead<R> {
inner: R,
bytes_read: usize,
}
impl<R: Read> Read for TrackedRead<R> {
fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
let v = self.inner.read(buf)?;
self.bytes_read += v;
Ok(v)
}
}
let mut tracked = TrackedRead {
inner: input,
bytes_read: 0,
};
let header = read_encrypted_page_header(&mut tracked, crypto_context.unwrap())?;
Ok((tracked.bytes_read, header))
}
/// Reads a [`PageHeader`] from the provided [`Read`] returning the number of bytes read.
fn read_page_header_len<T: Read>(input: &mut T) -> Result<(usize, PageHeader)> {
/// A wrapper around a [`std::io::Read`] that keeps track of the bytes read
struct TrackedRead<R> {
inner: R,
bytes_read: usize,
}
impl<R: Read> Read for TrackedRead<R> {
fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
let v = self.inner.read(buf)?;
self.bytes_read += v;
Ok(v)
}
}
let mut tracked = TrackedRead {
inner: input,
bytes_read: 0,
};
let header = read_page_header(&mut tracked)?;
Ok((tracked.bytes_read, header))
}
/// Decodes a [`Page`] from the provided `buffer`
pub(crate) fn decode_page(
page_header: PageHeader,
buffer: Bytes,
physical_type: Type,
decompressor: Option<&mut Box<dyn Codec>>,
) -> Result<Page> {
// Verify the 32-bit CRC checksum of the page
#[cfg(feature = "crc")]
if let Some(expected_crc) = page_header.crc {
let crc = crc32fast::hash(&buffer);
if crc != expected_crc as u32 {
return Err(general_err!("Page CRC checksum mismatch"));
}
}
// When processing data page v2, depending on enabled compression for the
// page, we should account for uncompressed data ('offset') of
// repetition and definition levels.
//
// We always use 0 offset for other pages other than v2, `true` flag means
// that compression will be applied if decompressor is defined
let mut offset: usize = 0;
let mut can_decompress = true;
if let Some(ref header_v2) = page_header.data_page_header_v2 {
if header_v2.definition_levels_byte_length < 0
|| header_v2.repetition_levels_byte_length < 0
|| header_v2.definition_levels_byte_length + header_v2.repetition_levels_byte_length
> page_header.uncompressed_page_size
{
return Err(general_err!(
"DataPage v2 header contains implausible values \
for definition_levels_byte_length ({}) \
and repetition_levels_byte_length ({}) \
given DataPage header provides uncompressed_page_size ({})",
header_v2.definition_levels_byte_length,
header_v2.repetition_levels_byte_length,
page_header.uncompressed_page_size
));
}
offset = usize::try_from(
header_v2.definition_levels_byte_length + header_v2.repetition_levels_byte_length,
)?;
// When is_compressed flag is missing the page is considered compressed
can_decompress = header_v2.is_compressed.unwrap_or(true);
}
// TODO: page header could be huge because of statistics. We should set a
// maximum page header size and abort if that is exceeded.
let buffer = match decompressor {
Some(decompressor) if can_decompress => {
let uncompressed_page_size = usize::try_from(page_header.uncompressed_page_size)?;
let decompressed_size = uncompressed_page_size - offset;
let mut decompressed = Vec::with_capacity(uncompressed_page_size);
decompressed.extend_from_slice(&buffer.as_ref()[..offset]);
if decompressed_size > 0 {
let compressed = &buffer.as_ref()[offset..];
decompressor.decompress(compressed, &mut decompressed, Some(decompressed_size))?;
}
if decompressed.len() != uncompressed_page_size {
return Err(general_err!(
"Actual decompressed size doesn't match the expected one ({} vs {})",
decompressed.len(),
uncompressed_page_size
));
}
Bytes::from(decompressed)
}
_ => buffer,
};
let result = match page_header.type_ {
PageType::DICTIONARY_PAGE => {
let dict_header = page_header.dictionary_page_header.as_ref().ok_or_else(|| {
ParquetError::General("Missing dictionary page header".to_string())
})?;
let is_sorted = dict_header.is_sorted.unwrap_or(false);
Page::DictionaryPage {
buf: buffer,
num_values: dict_header.num_values.try_into()?,
encoding: Encoding::try_from(dict_header.encoding)?,
is_sorted,
}
}
PageType::DATA_PAGE => {
let header = page_header
.data_page_header
.ok_or_else(|| ParquetError::General("Missing V1 data page header".to_string()))?;
Page::DataPage {
buf: buffer,
num_values: header.num_values.try_into()?,
encoding: Encoding::try_from(header.encoding)?,
def_level_encoding: Encoding::try_from(header.definition_level_encoding)?,
rep_level_encoding: Encoding::try_from(header.repetition_level_encoding)?,
statistics: statistics::from_thrift(physical_type, header.statistics)?,
}
}
PageType::DATA_PAGE_V2 => {
let header = page_header
.data_page_header_v2
.ok_or_else(|| ParquetError::General("Missing V2 data page header".to_string()))?;
let is_compressed = header.is_compressed.unwrap_or(true);
Page::DataPageV2 {
buf: buffer,
num_values: header.num_values.try_into()?,
encoding: Encoding::try_from(header.encoding)?,
num_nulls: header.num_nulls.try_into()?,
num_rows: header.num_rows.try_into()?,
def_levels_byte_len: header.definition_levels_byte_length.try_into()?,
rep_levels_byte_len: header.repetition_levels_byte_length.try_into()?,
is_compressed,
statistics: statistics::from_thrift(physical_type, header.statistics)?,
}
}
_ => {
// For unknown page type (e.g., INDEX_PAGE), skip and read next.
unimplemented!("Page type {:?} is not supported", page_header.type_)
}
};
Ok(result)
}
enum SerializedPageReaderState {
Values {
/// The current byte offset in the reader
offset: usize,
/// The length of the chunk in bytes
remaining_bytes: usize,
// If the next page header has already been "peeked", we will cache it and it`s length here
next_page_header: Option<Box<PageHeader>>,
/// The index of the data page within this column chunk
page_ordinal: usize,
/// Whether the next page is expected to be a dictionary page
require_dictionary: bool,
},
Pages {
/// Remaining page locations
page_locations: VecDeque<PageLocation>,
/// Remaining dictionary location if any
dictionary_page: Option<PageLocation>,
/// The total number of rows in this column chunk
total_rows: usize,
},
}
/// A serialized implementation for Parquet [`PageReader`].
pub struct SerializedPageReader<R: ChunkReader> {
/// The chunk reader
reader: Arc<R>,
/// The compression codec for this column chunk. Only set for non-PLAIN codec.
decompressor: Option<Box<dyn Codec>>,
/// Column chunk type.
physical_type: Type,
state: SerializedPageReaderState,
/// Crypto context carrying objects required for decryption
#[cfg(feature = "encryption")]
crypto_context: Option<Arc<CryptoContext>>,
}
impl<R: ChunkReader> SerializedPageReader<R> {
/// Creates a new serialized page reader from a chunk reader and metadata
pub fn new(
reader: Arc<R>,
column_chunk_metadata: &ColumnChunkMetaData,
total_rows: usize,
page_locations: Option<Vec<PageLocation>>,
) -> Result<Self> {
let props = Arc::new(ReaderProperties::builder().build());
SerializedPageReader::new_with_properties(
reader,
column_chunk_metadata,
total_rows,
page_locations,
props,
)
}
/// Stub No-op implementation when encryption is disabled.
#[cfg(all(feature = "arrow", not(feature = "encryption")))]
pub(crate) fn add_crypto_context(
self,
_rg_idx: usize,
_column_idx: usize,
_parquet_meta_data: &ParquetMetaData,
_column_chunk_metadata: &ColumnChunkMetaData,
) -> Result<SerializedPageReader<R>> {
Ok(self)
}
/// Adds any necessary crypto context to this page reader, if encryption is enabled.
#[cfg(feature = "encryption")]
pub(crate) fn add_crypto_context(
mut self,
rg_idx: usize,
column_idx: usize,
parquet_meta_data: &ParquetMetaData,
column_chunk_metadata: &ColumnChunkMetaData,
) -> Result<SerializedPageReader<R>> {
let Some(file_decryptor) = parquet_meta_data.file_decryptor() else {
return Ok(self);
};
let Some(crypto_metadata) = column_chunk_metadata.crypto_metadata() else {
return Ok(self);
};
let crypto_context =
CryptoContext::for_column(file_decryptor, crypto_metadata, rg_idx, column_idx)?;
self.crypto_context = Some(Arc::new(crypto_context));
Ok(self)
}
/// Creates a new serialized page with custom options.
pub fn new_with_properties(
reader: Arc<R>,
meta: &ColumnChunkMetaData,
total_rows: usize,
page_locations: Option<Vec<PageLocation>>,
props: ReaderPropertiesPtr,
) -> Result<Self> {
let decompressor = create_codec(meta.compression(), props.codec_options())?;
let (start, len) = meta.byte_range();
let state = match page_locations {
Some(locations) => {
let dictionary_page = match locations.first() {
Some(dict_offset) if dict_offset.offset as u64 != start => Some(PageLocation {
offset: start as i64,
compressed_page_size: (dict_offset.offset as u64 - start) as i32,
first_row_index: 0,
}),
_ => None,
};
SerializedPageReaderState::Pages {
page_locations: locations.into(),
dictionary_page,
total_rows,
}
}
None => SerializedPageReaderState::Values {
offset: usize::try_from(start)?,
remaining_bytes: usize::try_from(len)?,
next_page_header: None,
page_ordinal: 0,
require_dictionary: meta.dictionary_page_offset().is_some(),
},
};
Ok(Self {
reader,
decompressor,
state,
physical_type: meta.column_type(),
#[cfg(feature = "encryption")]
crypto_context: None,
})
}
/// Similar to `peek_next_page`, but returns the offset of the next page instead of the page metadata.
/// Unlike page metadata, an offset can uniquely identify a page.
///
/// This is used when we need to read parquet with row-filter, and we don't want to decompress the page twice.
/// This function allows us to check if the next page is being cached or read previously.
#[cfg(test)]
fn peek_next_page_offset(&mut self) -> Result<Option<usize>> {
match &mut self.state {
SerializedPageReaderState::Values {
offset,
remaining_bytes,
next_page_header,
..
} => {
loop {
if *remaining_bytes == 0 {
return Ok(None);
}
return if let Some(header) = next_page_header.as_ref() {
if let Ok(_page_meta) = PageMetadata::try_from(&**header) {
Ok(Some(*offset))
} else {
// For unknown page type (e.g., INDEX_PAGE), skip and read next.
*next_page_header = None;
continue;
}
} else {
let mut read = self.reader.get_read(*offset as u64)?;
let (header_len, header) = read_page_header_len(&mut read)?;
*offset += header_len;
*remaining_bytes -= header_len;
let page_meta = if let Ok(_page_meta) = PageMetadata::try_from(&header) {
Ok(Some(*offset))
} else {
// For unknown page type (e.g., INDEX_PAGE), skip and read next.
continue;
};
*next_page_header = Some(Box::new(header));
page_meta
};
}
}
SerializedPageReaderState::Pages {
page_locations,
dictionary_page,
..
} => {
if let Some(page) = dictionary_page {
Ok(Some(usize::try_from(page.offset)?))
} else if let Some(page) = page_locations.front() {
Ok(Some(usize::try_from(page.offset)?))
} else {
Ok(None)
}
}
}
}
}
impl<R: ChunkReader> Iterator for SerializedPageReader<R> {
type Item = Result<Page>;
fn next(&mut self) -> Option<Self::Item> {
self.get_next_page().transpose()
}
}
fn verify_page_header_len(header_len: usize, remaining_bytes: usize) -> Result<()> {
if header_len > remaining_bytes {
return Err(eof_err!("Invalid page header"));
}
Ok(())
}
fn verify_page_size(
compressed_size: i32,
uncompressed_size: i32,
remaining_bytes: usize,
) -> Result<()> {
// The page's compressed size should not exceed the remaining bytes that are
// available to read. The page's uncompressed size is the expected size
// after decompression, which can never be negative.
if compressed_size < 0 || compressed_size as usize > remaining_bytes || uncompressed_size < 0 {
return Err(eof_err!("Invalid page header"));
}
Ok(())
}
impl<R: ChunkReader> PageReader for SerializedPageReader<R> {
fn get_next_page(&mut self) -> Result<Option<Page>> {
loop {
let page = match &mut self.state {
SerializedPageReaderState::Values {
offset,
remaining_bytes: remaining,
next_page_header,
page_ordinal,
require_dictionary,
} => {
if *remaining == 0 {
return Ok(None);
}
let mut read = self.reader.get_read(*offset as u64)?;
let header = if let Some(header) = next_page_header.take() {
*header
} else {
#[cfg(feature = "encryption")]
let (header_len, header) = if self.crypto_context.is_some() {
let crypto_context = page_crypto_context(
&self.crypto_context,
*page_ordinal,
*require_dictionary,
)?;
read_encrypted_page_header_len(&mut read, crypto_context)?
} else {
read_page_header_len(&mut read)?
};
#[cfg(not(feature = "encryption"))]
let (header_len, header) = read_page_header_len(&mut read)?;
verify_page_header_len(header_len, *remaining)?;
*offset += header_len;
*remaining -= header_len;
header
};
verify_page_size(
header.compressed_page_size,
header.uncompressed_page_size,
*remaining,
)?;
let data_len = header.compressed_page_size as usize;
*offset += data_len;
*remaining -= data_len;
if header.type_ == PageType::INDEX_PAGE {
continue;
}
let mut buffer = Vec::with_capacity(data_len);
let read = read.take(data_len as u64).read_to_end(&mut buffer)?;
if read != data_len {
return Err(eof_err!(
"Expected to read {} bytes of page, read only {}",
data_len,
read
));
}
#[cfg(feature = "encryption")]
let crypto_context = page_crypto_context(
&self.crypto_context,
*page_ordinal,
*require_dictionary,
)?;
#[cfg(feature = "encryption")]
let buffer: Vec<u8> = if let Some(crypto_context) = crypto_context {
let decryptor = crypto_context.data_decryptor();
let aad = crypto_context.create_page_aad()?;
decryptor.decrypt(buffer.as_ref(), &aad)?
} else {
buffer
};
let page = decode_page(
header,
Bytes::from(buffer),
self.physical_type,
self.decompressor.as_mut(),
)?;
if page.is_data_page() {
*page_ordinal += 1;
} else if page.is_dictionary_page() {
*require_dictionary = false;
}
page
}
SerializedPageReaderState::Pages {
page_locations,
dictionary_page,
..
} => {
let front = match dictionary_page
.take()
.or_else(|| page_locations.pop_front())
{
Some(front) => front,
None => return Ok(None),
};
let page_len = usize::try_from(front.compressed_page_size)?;
let buffer = self.reader.get_bytes(front.offset as u64, page_len)?;
let mut prot = TCompactSliceInputProtocol::new(buffer.as_ref());
let header = PageHeader::read_from_in_protocol(&mut prot)?;
let offset = buffer.len() - prot.as_slice().len();
let bytes = buffer.slice(offset..);
decode_page(
header,
bytes,
self.physical_type,
self.decompressor.as_mut(),
)?
}
};
return Ok(Some(page));
}
}
fn peek_next_page(&mut self) -> Result<Option<PageMetadata>> {
match &mut self.state {
SerializedPageReaderState::Values {
offset,
remaining_bytes,
next_page_header,
..
} => {
loop {
if *remaining_bytes == 0 {
return Ok(None);
}
return if let Some(header) = next_page_header.as_ref() {
if let Ok(page_meta) = (&**header).try_into() {
Ok(Some(page_meta))
} else {
// For unknown page type (e.g., INDEX_PAGE), skip and read next.
*next_page_header = None;
continue;
}
} else {
let mut read = self.reader.get_read(*offset as u64)?;
let (header_len, header) = read_page_header_len(&mut read)?;
verify_page_header_len(header_len, *remaining_bytes)?;
*offset += header_len;
*remaining_bytes -= header_len;
let page_meta = if let Ok(page_meta) = (&header).try_into() {
Ok(Some(page_meta))
} else {
// For unknown page type (e.g., INDEX_PAGE), skip and read next.
continue;
};
*next_page_header = Some(Box::new(header));
page_meta
};
}
}
SerializedPageReaderState::Pages {
page_locations,
dictionary_page,
total_rows,
} => {
if dictionary_page.is_some() {
Ok(Some(PageMetadata {
num_rows: None,
num_levels: None,
is_dict: true,
}))
} else if let Some(page) = page_locations.front() {
let next_rows = page_locations
.get(1)
.map(|x| x.first_row_index as usize)
.unwrap_or(*total_rows);
Ok(Some(PageMetadata {
num_rows: Some(next_rows - page.first_row_index as usize),
num_levels: None,
is_dict: false,
}))
} else {
Ok(None)
}
}
}
}
fn skip_next_page(&mut self) -> Result<()> {
match &mut self.state {
SerializedPageReaderState::Values {
offset,
remaining_bytes,
next_page_header,
..
} => {
if let Some(buffered_header) = next_page_header.take() {
verify_page_size(
buffered_header.compressed_page_size,
buffered_header.uncompressed_page_size,
*remaining_bytes,
)?;
// The next page header has already been peeked, so just advance the offset
*offset += buffered_header.compressed_page_size as usize;
*remaining_bytes -= buffered_header.compressed_page_size as usize;
} else {
let mut read = self.reader.get_read(*offset as u64)?;
let (header_len, header) = read_page_header_len(&mut read)?;
verify_page_header_len(header_len, *remaining_bytes)?;
verify_page_size(
header.compressed_page_size,
header.uncompressed_page_size,
*remaining_bytes,
)?;
let data_page_size = header.compressed_page_size as usize;
*offset += header_len + data_page_size;
*remaining_bytes -= header_len + data_page_size;
}
Ok(())
}
SerializedPageReaderState::Pages {
page_locations,
dictionary_page,
..
} => {
if dictionary_page.is_some() {
// If a dictionary page exists, consume it by taking it (sets to None)
dictionary_page.take();
} else {
// If no dictionary page exists, simply pop the data page from page_locations
page_locations.pop_front();
}
Ok(())
}
}
}
fn at_record_boundary(&mut self) -> Result<bool> {
match &mut self.state {
SerializedPageReaderState::Values { .. } => Ok(self.peek_next_page()?.is_none()),
SerializedPageReaderState::Pages { .. } => Ok(true),
}
}
}
#[cfg(feature = "encryption")]
fn page_crypto_context(
crypto_context: &Option<Arc<CryptoContext>>,
page_ordinal: usize,
dictionary_page: bool,
) -> Result<Option<Arc<CryptoContext>>> {
Ok(crypto_context.as_ref().map(|c| {
Arc::new(if dictionary_page {
c.for_dictionary_page()
} else {
c.with_page_ordinal(page_ordinal)
})
}))
}
#[cfg(test)]
mod tests {
use std::collections::HashSet;
use bytes::Buf;
use crate::file::properties::{EnabledStatistics, WriterProperties};
use crate::format::BoundaryOrder;
use crate::basic::{self, ColumnOrder, SortOrder};
use crate::column::reader::ColumnReader;
use crate::data_type::private::ParquetValueType;
use crate::data_type::{AsBytes, FixedLenByteArrayType, Int32Type};
use crate::file::page_index::index::{Index, NativeIndex};
use crate::file::page_index::index_reader::{read_columns_indexes, read_offset_indexes};
use crate::file::writer::SerializedFileWriter;
use crate::record::RowAccessor;
use crate::schema::parser::parse_message_type;
use crate::util::test_common::file_util::{get_test_file, get_test_path};
use super::*;
#[test]
fn test_cursor_and_file_has_the_same_behaviour() {
let mut buf: Vec<u8> = Vec::new();
get_test_file("alltypes_plain.parquet")
.read_to_end(&mut buf)
.unwrap();
let cursor = Bytes::from(buf);
let read_from_cursor = SerializedFileReader::new(cursor).unwrap();
let test_file = get_test_file("alltypes_plain.parquet");
let read_from_file = SerializedFileReader::new(test_file).unwrap();
let file_iter = read_from_file.get_row_iter(None).unwrap();
let cursor_iter = read_from_cursor.get_row_iter(None).unwrap();
for (a, b) in file_iter.zip(cursor_iter) {
assert_eq!(a.unwrap(), b.unwrap())
}
}
#[test]
fn test_file_reader_try_from() {
// Valid file path
let test_file = get_test_file("alltypes_plain.parquet");
let test_path_buf = get_test_path("alltypes_plain.parquet");
let test_path = test_path_buf.as_path();
let test_path_str = test_path.to_str().unwrap();
let reader = SerializedFileReader::try_from(test_file);
assert!(reader.is_ok());
let reader = SerializedFileReader::try_from(test_path);
assert!(reader.is_ok());
let reader = SerializedFileReader::try_from(test_path_str);
assert!(reader.is_ok());
let reader = SerializedFileReader::try_from(test_path_str.to_string());
assert!(reader.is_ok());
// Invalid file path
let test_path = Path::new("invalid.parquet");
let test_path_str = test_path.to_str().unwrap();
let reader = SerializedFileReader::try_from(test_path);
assert!(reader.is_err());
let reader = SerializedFileReader::try_from(test_path_str);
assert!(reader.is_err());
let reader = SerializedFileReader::try_from(test_path_str.to_string());
assert!(reader.is_err());
}
#[test]
fn test_file_reader_into_iter() {
let path = get_test_path("alltypes_plain.parquet");
let reader = SerializedFileReader::try_from(path.as_path()).unwrap();
let iter = reader.into_iter();
let values: Vec<_> = iter.flat_map(|x| x.unwrap().get_int(0)).collect();
assert_eq!(values, &[4, 5, 6, 7, 2, 3, 0, 1]);
}
#[test]
fn test_file_reader_into_iter_project() {
let path = get_test_path("alltypes_plain.parquet");
let reader = SerializedFileReader::try_from(path.as_path()).unwrap();
let schema = "message schema { OPTIONAL INT32 id; }";
let proj = parse_message_type(schema).ok();
let iter = reader.into_iter().project(proj).unwrap();
let values: Vec<_> = iter.flat_map(|x| x.unwrap().get_int(0)).collect();
assert_eq!(values, &[4, 5, 6, 7, 2, 3, 0, 1]);
}
#[test]
fn test_reuse_file_chunk() {
// This test covers the case of maintaining the correct start position in a file
// stream for each column reader after initializing and moving to the next one
// (without necessarily reading the entire column).
let test_file = get_test_file("alltypes_plain.parquet");
let reader = SerializedFileReader::new(test_file).unwrap();
let row_group = reader.get_row_group(0).unwrap();
let mut page_readers = Vec::new();
for i in 0..row_group.num_columns() {
page_readers.push(row_group.get_column_page_reader(i).unwrap());
}
// Now buffer each col reader, we do not expect any failures like:
// General("underlying Thrift error: end of file")
for mut page_reader in page_readers {
assert!(page_reader.get_next_page().is_ok());
}
}
#[test]
fn test_file_reader() {
let test_file = get_test_file("alltypes_plain.parquet");
let reader_result = SerializedFileReader::new(test_file);
assert!(reader_result.is_ok());
let reader = reader_result.unwrap();
// Test contents in Parquet metadata
let metadata = reader.metadata();
assert_eq!(metadata.num_row_groups(), 1);
// Test contents in file metadata
let file_metadata = metadata.file_metadata();
assert!(file_metadata.created_by().is_some());
assert_eq!(
file_metadata.created_by().unwrap(),
"impala version 1.3.0-INTERNAL (build 8a48ddb1eff84592b3fc06bc6f51ec120e1fffc9)"
);
assert!(file_metadata.key_value_metadata().is_none());
assert_eq!(file_metadata.num_rows(), 8);
assert_eq!(file_metadata.version(), 1);
assert_eq!(file_metadata.column_orders(), None);
// Test contents in row group metadata
let row_group_metadata = metadata.row_group(0);
assert_eq!(row_group_metadata.num_columns(), 11);
assert_eq!(row_group_metadata.num_rows(), 8);
assert_eq!(row_group_metadata.total_byte_size(), 671);
// Check each column order
for i in 0..row_group_metadata.num_columns() {
assert_eq!(file_metadata.column_order(i), ColumnOrder::UNDEFINED);
}
// Test row group reader
let row_group_reader_result = reader.get_row_group(0);
assert!(row_group_reader_result.is_ok());
let row_group_reader: Box<dyn RowGroupReader> = row_group_reader_result.unwrap();
assert_eq!(
row_group_reader.num_columns(),
row_group_metadata.num_columns()
);
assert_eq!(
row_group_reader.metadata().total_byte_size(),
row_group_metadata.total_byte_size()
);
// Test page readers
// TODO: test for every column
let page_reader_0_result = row_group_reader.get_column_page_reader(0);
assert!(page_reader_0_result.is_ok());
let mut page_reader_0: Box<dyn PageReader> = page_reader_0_result.unwrap();
let mut page_count = 0;
while let Some(page) = page_reader_0.get_next_page().unwrap() {
let is_expected_page = match page {
Page::DictionaryPage {
buf,
num_values,
encoding,
is_sorted,
} => {
assert_eq!(buf.len(), 32);
assert_eq!(num_values, 8);
assert_eq!(encoding, Encoding::PLAIN_DICTIONARY);
assert!(!is_sorted);
true
}
Page::DataPage {
buf,
num_values,
encoding,
def_level_encoding,
rep_level_encoding,
statistics,
} => {
assert_eq!(buf.len(), 11);
assert_eq!(num_values, 8);
assert_eq!(encoding, Encoding::PLAIN_DICTIONARY);
assert_eq!(def_level_encoding, Encoding::RLE);
#[allow(deprecated)]
let expected_rep_level_encoding = Encoding::BIT_PACKED;
assert_eq!(rep_level_encoding, expected_rep_level_encoding);
assert!(statistics.is_none());
true
}
_ => false,
};
assert!(is_expected_page);
page_count += 1;
}
assert_eq!(page_count, 2);
}
#[test]
fn test_file_reader_datapage_v2() {
let test_file = get_test_file("datapage_v2.snappy.parquet");
let reader_result = SerializedFileReader::new(test_file);
assert!(reader_result.is_ok());
let reader = reader_result.unwrap();
// Test contents in Parquet metadata
let metadata = reader.metadata();
assert_eq!(metadata.num_row_groups(), 1);
// Test contents in file metadata
let file_metadata = metadata.file_metadata();
assert!(file_metadata.created_by().is_some());
assert_eq!(
file_metadata.created_by().unwrap(),
"parquet-mr version 1.8.1 (build 4aba4dae7bb0d4edbcf7923ae1339f28fd3f7fcf)"
);
assert!(file_metadata.key_value_metadata().is_some());
assert_eq!(
file_metadata.key_value_metadata().to_owned().unwrap().len(),
1
);
assert_eq!(file_metadata.num_rows(), 5);
assert_eq!(file_metadata.version(), 1);
assert_eq!(file_metadata.column_orders(), None);
let row_group_metadata = metadata.row_group(0);
// Check each column order
for i in 0..row_group_metadata.num_columns() {
assert_eq!(file_metadata.column_order(i), ColumnOrder::UNDEFINED);
}
// Test row group reader
let row_group_reader_result = reader.get_row_group(0);
assert!(row_group_reader_result.is_ok());
let row_group_reader: Box<dyn RowGroupReader> = row_group_reader_result.unwrap();
assert_eq!(
row_group_reader.num_columns(),
row_group_metadata.num_columns()
);
assert_eq!(
row_group_reader.metadata().total_byte_size(),
row_group_metadata.total_byte_size()
);
// Test page readers
// TODO: test for every column
let page_reader_0_result = row_group_reader.get_column_page_reader(0);
assert!(page_reader_0_result.is_ok());
let mut page_reader_0: Box<dyn PageReader> = page_reader_0_result.unwrap();
let mut page_count = 0;
while let Some(page) = page_reader_0.get_next_page().unwrap() {
let is_expected_page = match page {
Page::DictionaryPage {
buf,
num_values,
encoding,
is_sorted,
} => {
assert_eq!(buf.len(), 7);
assert_eq!(num_values, 1);
assert_eq!(encoding, Encoding::PLAIN);
assert!(!is_sorted);
true
}
Page::DataPageV2 {
buf,
num_values,
encoding,
num_nulls,
num_rows,
def_levels_byte_len,
rep_levels_byte_len,
is_compressed,
statistics,
} => {
assert_eq!(buf.len(), 4);
assert_eq!(num_values, 5);
assert_eq!(encoding, Encoding::RLE_DICTIONARY);
assert_eq!(num_nulls, 1);
assert_eq!(num_rows, 5);
assert_eq!(def_levels_byte_len, 2);
assert_eq!(rep_levels_byte_len, 0);
assert!(is_compressed);
assert!(statistics.is_some());
true
}
_ => false,
};
assert!(is_expected_page);
page_count += 1;
}
assert_eq!(page_count, 2);
}
#[test]
fn test_file_reader_empty_compressed_datapage_v2() {
// this file has a compressed datapage that un-compresses to 0 bytes
let test_file = get_test_file("page_v2_empty_compressed.parquet");
let reader_result = SerializedFileReader::new(test_file);
assert!(reader_result.is_ok());
let reader = reader_result.unwrap();
// Test contents in Parquet metadata
let metadata = reader.metadata();
assert_eq!(metadata.num_row_groups(), 1);
// Test contents in file metadata
let file_metadata = metadata.file_metadata();
assert!(file_metadata.created_by().is_some());
assert_eq!(
file_metadata.created_by().unwrap(),
"parquet-cpp-arrow version 14.0.2"
);
assert!(file_metadata.key_value_metadata().is_some());
assert_eq!(
file_metadata.key_value_metadata().to_owned().unwrap().len(),
1
);
assert_eq!(file_metadata.num_rows(), 10);
assert_eq!(file_metadata.version(), 2);
let expected_order = ColumnOrder::TYPE_DEFINED_ORDER(SortOrder::SIGNED);
assert_eq!(
file_metadata.column_orders(),
Some(vec![expected_order].as_ref())
);
let row_group_metadata = metadata.row_group(0);
// Check each column order
for i in 0..row_group_metadata.num_columns() {
assert_eq!(file_metadata.column_order(i), expected_order);
}
// Test row group reader
let row_group_reader_result = reader.get_row_group(0);
assert!(row_group_reader_result.is_ok());
let row_group_reader: Box<dyn RowGroupReader> = row_group_reader_result.unwrap();
assert_eq!(
row_group_reader.num_columns(),
row_group_metadata.num_columns()
);
assert_eq!(
row_group_reader.metadata().total_byte_size(),
row_group_metadata.total_byte_size()
);
// Test page readers
let page_reader_0_result = row_group_reader.get_column_page_reader(0);
assert!(page_reader_0_result.is_ok());
let mut page_reader_0: Box<dyn PageReader> = page_reader_0_result.unwrap();
let mut page_count = 0;
while let Some(page) = page_reader_0.get_next_page().unwrap() {
let is_expected_page = match page {
Page::DictionaryPage {
buf,
num_values,
encoding,
is_sorted,
} => {
assert_eq!(buf.len(), 0);
assert_eq!(num_values, 0);
assert_eq!(encoding, Encoding::PLAIN);
assert!(!is_sorted);
true
}
Page::DataPageV2 {
buf,
num_values,
encoding,
num_nulls,
num_rows,
def_levels_byte_len,
rep_levels_byte_len,
is_compressed,
statistics,
} => {
assert_eq!(buf.len(), 3);
assert_eq!(num_values, 10);
assert_eq!(encoding, Encoding::RLE_DICTIONARY);
assert_eq!(num_nulls, 10);
assert_eq!(num_rows, 10);
assert_eq!(def_levels_byte_len, 2);
assert_eq!(rep_levels_byte_len, 0);
assert!(is_compressed);
assert!(statistics.is_some());
true
}
_ => false,
};
assert!(is_expected_page);
page_count += 1;
}
assert_eq!(page_count, 2);
}
#[test]
fn test_file_reader_empty_datapage_v2() {
// this file has 0 bytes compressed datapage that un-compresses to 0 bytes
let test_file = get_test_file("datapage_v2_empty_datapage.snappy.parquet");
let reader_result = SerializedFileReader::new(test_file);
assert!(reader_result.is_ok());
let reader = reader_result.unwrap();
// Test contents in Parquet metadata
let metadata = reader.metadata();
assert_eq!(metadata.num_row_groups(), 1);
// Test contents in file metadata
let file_metadata = metadata.file_metadata();
assert!(file_metadata.created_by().is_some());
assert_eq!(
file_metadata.created_by().unwrap(),
"parquet-mr version 1.13.1 (build db4183109d5b734ec5930d870cdae161e408ddba)"
);
assert!(file_metadata.key_value_metadata().is_some());
assert_eq!(
file_metadata.key_value_metadata().to_owned().unwrap().len(),
2
);
assert_eq!(file_metadata.num_rows(), 1);
assert_eq!(file_metadata.version(), 1);
let expected_order = ColumnOrder::TYPE_DEFINED_ORDER(SortOrder::SIGNED);
assert_eq!(
file_metadata.column_orders(),
Some(vec![expected_order].as_ref())
);
let row_group_metadata = metadata.row_group(0);
// Check each column order
for i in 0..row_group_metadata.num_columns() {
assert_eq!(file_metadata.column_order(i), expected_order);
}
// Test row group reader
let row_group_reader_result = reader.get_row_group(0);
assert!(row_group_reader_result.is_ok());
let row_group_reader: Box<dyn RowGroupReader> = row_group_reader_result.unwrap();
assert_eq!(
row_group_reader.num_columns(),
row_group_metadata.num_columns()
);
assert_eq!(
row_group_reader.metadata().total_byte_size(),
row_group_metadata.total_byte_size()
);
// Test page readers
let page_reader_0_result = row_group_reader.get_column_page_reader(0);
assert!(page_reader_0_result.is_ok());
let mut page_reader_0: Box<dyn PageReader> = page_reader_0_result.unwrap();
let mut page_count = 0;
while let Some(page) = page_reader_0.get_next_page().unwrap() {
let is_expected_page = match page {
Page::DataPageV2 {
buf,
num_values,
encoding,
num_nulls,
num_rows,
def_levels_byte_len,
rep_levels_byte_len,
is_compressed,
statistics,
} => {
assert_eq!(buf.len(), 2);
assert_eq!(num_values, 1);
assert_eq!(encoding, Encoding::PLAIN);
assert_eq!(num_nulls, 1);
assert_eq!(num_rows, 1);
assert_eq!(def_levels_byte_len, 2);
assert_eq!(rep_levels_byte_len, 0);
assert!(is_compressed);
assert!(statistics.is_none());
true
}
_ => false,
};
assert!(is_expected_page);
page_count += 1;
}
assert_eq!(page_count, 1);
}
fn get_serialized_page_reader<R: ChunkReader>(
file_reader: &SerializedFileReader<R>,
row_group: usize,
column: usize,
) -> Result<SerializedPageReader<R>> {
let row_group = {
let row_group_metadata = file_reader.metadata.row_group(row_group);
let props = Arc::clone(&file_reader.props);
let f = Arc::clone(&file_reader.chunk_reader);
SerializedRowGroupReader::new(
f,
row_group_metadata,
file_reader
.metadata
.offset_index()
.map(|x| x[row_group].as_slice()),
props,
)?
};
let col = row_group.metadata.column(column);
let page_locations = row_group
.offset_index
.map(|x| x[column].page_locations.clone());
let props = Arc::clone(&row_group.props);
SerializedPageReader::new_with_properties(
Arc::clone(&row_group.chunk_reader),
col,
usize::try_from(row_group.metadata.num_rows())?,
page_locations,
props,
)
}
#[test]
fn test_peek_next_page_offset_matches_actual() -> Result<()> {
let test_file = get_test_file("alltypes_plain.parquet");
let reader = SerializedFileReader::new(test_file)?;
let mut offset_set = HashSet::new();
let num_row_groups = reader.metadata.num_row_groups();
for row_group in 0..num_row_groups {
let num_columns = reader.metadata.row_group(row_group).num_columns();
for column in 0..num_columns {
let mut page_reader = get_serialized_page_reader(&reader, row_group, column)?;
while let Ok(Some(page_offset)) = page_reader.peek_next_page_offset() {
match &page_reader.state {
SerializedPageReaderState::Pages {
page_locations,
dictionary_page,
..
} => {
if let Some(page) = dictionary_page {
assert_eq!(page.offset as usize, page_offset);
} else if let Some(page) = page_locations.front() {
assert_eq!(page.offset as usize, page_offset);
} else {
unreachable!()
}
}
SerializedPageReaderState::Values {
offset,
next_page_header,
..
} => {
assert!(next_page_header.is_some());
assert_eq!(*offset, page_offset);
}
}
let page = page_reader.get_next_page()?;
assert!(page.is_some());
let newly_inserted = offset_set.insert(page_offset);
assert!(newly_inserted);
}
}
}
Ok(())
}
#[test]
fn test_page_iterator() {
let file = get_test_file("alltypes_plain.parquet");
let file_reader = Arc::new(SerializedFileReader::new(file).unwrap());
let mut page_iterator = FilePageIterator::new(0, file_reader.clone()).unwrap();
// read first page
let page = page_iterator.next();
assert!(page.is_some());
assert!(page.unwrap().is_ok());
// reach end of file
let page = page_iterator.next();
assert!(page.is_none());
let row_group_indices = Box::new(0..1);
let mut page_iterator =
FilePageIterator::with_row_groups(0, row_group_indices, file_reader).unwrap();
// read first page
let page = page_iterator.next();
assert!(page.is_some());
assert!(page.unwrap().is_ok());
// reach end of file
let page = page_iterator.next();
assert!(page.is_none());
}
#[test]
fn test_file_reader_key_value_metadata() {
let file = get_test_file("binary.parquet");
let file_reader = Arc::new(SerializedFileReader::new(file).unwrap());
let metadata = file_reader
.metadata
.file_metadata()
.key_value_metadata()
.unwrap();
assert_eq!(metadata.len(), 3);
assert_eq!(metadata[0].key, "parquet.proto.descriptor");
assert_eq!(metadata[1].key, "writer.model.name");
assert_eq!(metadata[1].value, Some("protobuf".to_owned()));
assert_eq!(metadata[2].key, "parquet.proto.class");
assert_eq!(metadata[2].value, Some("foo.baz.Foobaz$Event".to_owned()));
}
#[test]
fn test_file_reader_optional_metadata() {
// file with optional metadata: bloom filters, encoding stats, column index and offset index.
let file = get_test_file("data_index_bloom_encoding_stats.parquet");
let file_reader = Arc::new(SerializedFileReader::new(file).unwrap());
let row_group_metadata = file_reader.metadata.row_group(0);
let col0_metadata = row_group_metadata.column(0);
// test optional bloom filter offset
assert_eq!(col0_metadata.bloom_filter_offset().unwrap(), 192);
// test page encoding stats
let page_encoding_stats = &col0_metadata.page_encoding_stats().unwrap()[0];
assert_eq!(page_encoding_stats.page_type, basic::PageType::DATA_PAGE);
assert_eq!(page_encoding_stats.encoding, Encoding::PLAIN);
assert_eq!(page_encoding_stats.count, 1);
// test optional column index offset
assert_eq!(col0_metadata.column_index_offset().unwrap(), 156);
assert_eq!(col0_metadata.column_index_length().unwrap(), 25);
// test optional offset index offset
assert_eq!(col0_metadata.offset_index_offset().unwrap(), 181);
assert_eq!(col0_metadata.offset_index_length().unwrap(), 11);
}
#[test]
fn test_file_reader_with_no_filter() -> Result<()> {
let test_file = get_test_file("alltypes_plain.parquet");
let origin_reader = SerializedFileReader::new(test_file)?;
// test initial number of row groups
let metadata = origin_reader.metadata();
assert_eq!(metadata.num_row_groups(), 1);
Ok(())
}
#[test]
fn test_file_reader_filter_row_groups_with_predicate() -> Result<()> {
let test_file = get_test_file("alltypes_plain.parquet");
let read_options = ReadOptionsBuilder::new()
.with_predicate(Box::new(|_, _| false))
.build();
let reader = SerializedFileReader::new_with_options(test_file, read_options)?;
let metadata = reader.metadata();
assert_eq!(metadata.num_row_groups(), 0);
Ok(())
}
#[test]
fn test_file_reader_filter_row_groups_with_range() -> Result<()> {
let test_file = get_test_file("alltypes_plain.parquet");
let origin_reader = SerializedFileReader::new(test_file)?;
// test initial number of row groups
let metadata = origin_reader.metadata();
assert_eq!(metadata.num_row_groups(), 1);
let mid = get_midpoint_offset(metadata.row_group(0));
let test_file = get_test_file("alltypes_plain.parquet");
let read_options = ReadOptionsBuilder::new().with_range(0, mid + 1).build();
let reader = SerializedFileReader::new_with_options(test_file, read_options)?;
let metadata = reader.metadata();
assert_eq!(metadata.num_row_groups(), 1);
let test_file = get_test_file("alltypes_plain.parquet");
let read_options = ReadOptionsBuilder::new().with_range(0, mid).build();
let reader = SerializedFileReader::new_with_options(test_file, read_options)?;
let metadata = reader.metadata();
assert_eq!(metadata.num_row_groups(), 0);
Ok(())
}
#[test]
fn test_file_reader_filter_row_groups_and_range() -> Result<()> {
let test_file = get_test_file("alltypes_tiny_pages.parquet");
let origin_reader = SerializedFileReader::new(test_file)?;
let metadata = origin_reader.metadata();
let mid = get_midpoint_offset(metadata.row_group(0));
// true, true predicate
let test_file = get_test_file("alltypes_tiny_pages.parquet");
let read_options = ReadOptionsBuilder::new()
.with_page_index()
.with_predicate(Box::new(|_, _| true))
.with_range(mid, mid + 1)
.build();
let reader = SerializedFileReader::new_with_options(test_file, read_options)?;
let metadata = reader.metadata();
assert_eq!(metadata.num_row_groups(), 1);
assert_eq!(metadata.column_index().unwrap().len(), 1);
assert_eq!(metadata.offset_index().unwrap().len(), 1);
// true, false predicate
let test_file = get_test_file("alltypes_tiny_pages.parquet");
let read_options = ReadOptionsBuilder::new()
.with_page_index()
.with_predicate(Box::new(|_, _| true))
.with_range(0, mid)
.build();
let reader = SerializedFileReader::new_with_options(test_file, read_options)?;
let metadata = reader.metadata();
assert_eq!(metadata.num_row_groups(), 0);
assert!(metadata.column_index().is_none());
assert!(metadata.offset_index().is_none());
// false, true predicate
let test_file = get_test_file("alltypes_tiny_pages.parquet");
let read_options = ReadOptionsBuilder::new()
.with_page_index()
.with_predicate(Box::new(|_, _| false))
.with_range(mid, mid + 1)
.build();
let reader = SerializedFileReader::new_with_options(test_file, read_options)?;
let metadata = reader.metadata();
assert_eq!(metadata.num_row_groups(), 0);
assert!(metadata.column_index().is_none());
assert!(metadata.offset_index().is_none());
// false, false predicate
let test_file = get_test_file("alltypes_tiny_pages.parquet");
let read_options = ReadOptionsBuilder::new()
.with_page_index()
.with_predicate(Box::new(|_, _| false))
.with_range(0, mid)
.build();
let reader = SerializedFileReader::new_with_options(test_file, read_options)?;
let metadata = reader.metadata();
assert_eq!(metadata.num_row_groups(), 0);
assert!(metadata.column_index().is_none());
assert!(metadata.offset_index().is_none());
Ok(())
}
#[test]
fn test_file_reader_invalid_metadata() {
let data = [
255, 172, 1, 0, 50, 82, 65, 73, 1, 0, 0, 0, 169, 168, 168, 162, 87, 255, 16, 0, 0, 0,
80, 65, 82, 49,
];
let ret = SerializedFileReader::new(Bytes::copy_from_slice(&data));
assert_eq!(
ret.err().unwrap().to_string(),
"Parquet error: Could not parse metadata: bad data"
);
}
#[test]
// Use java parquet-tools get below pageIndex info
// !```
// parquet-tools column-index ./data_index_bloom_encoding_stats.parquet
// row group 0:
// column index for column String:
// Boundary order: ASCENDING
// page-0 :
// null count min max
// 0 Hello today
//
// offset index for column String:
// page-0 :
// offset compressed size first row index
// 4 152 0
///```
//
fn test_page_index_reader() {
let test_file = get_test_file("data_index_bloom_encoding_stats.parquet");
let builder = ReadOptionsBuilder::new();
//enable read page index
let options = builder.with_page_index().build();
let reader_result = SerializedFileReader::new_with_options(test_file, options);
let reader = reader_result.unwrap();
// Test contents in Parquet metadata
let metadata = reader.metadata();
assert_eq!(metadata.num_row_groups(), 1);
let column_index = metadata.column_index().unwrap();
// only one row group
assert_eq!(column_index.len(), 1);
let index = if let Index::BYTE_ARRAY(index) = &column_index[0][0] {
index
} else {
unreachable!()
};
assert_eq!(index.boundary_order, BoundaryOrder::ASCENDING);
let index_in_pages = &index.indexes;
//only one page group
assert_eq!(index_in_pages.len(), 1);
let page0 = &index_in_pages[0];
let min = page0.min.as_ref().unwrap();
let max = page0.max.as_ref().unwrap();
assert_eq!(b"Hello", min.as_bytes());
assert_eq!(b"today", max.as_bytes());
let offset_indexes = metadata.offset_index().unwrap();
// only one row group
assert_eq!(offset_indexes.len(), 1);
let offset_index = &offset_indexes[0];
let page_offset = &offset_index[0].page_locations()[0];
assert_eq!(4, page_offset.offset);
assert_eq!(152, page_offset.compressed_page_size);
assert_eq!(0, page_offset.first_row_index);
}
#[test]
fn test_page_index_reader_out_of_order() {
let test_file = get_test_file("alltypes_tiny_pages_plain.parquet");
let options = ReadOptionsBuilder::new().with_page_index().build();
let reader = SerializedFileReader::new_with_options(test_file, options).unwrap();
let metadata = reader.metadata();
let test_file = get_test_file("alltypes_tiny_pages_plain.parquet");
let columns = metadata.row_group(0).columns();
let reversed: Vec<_> = columns.iter().cloned().rev().collect();
let a = read_columns_indexes(&test_file, columns).unwrap().unwrap();
let mut b = read_columns_indexes(&test_file, &reversed)
.unwrap()
.unwrap();
b.reverse();
assert_eq!(a, b);
let a = read_offset_indexes(&test_file, columns).unwrap().unwrap();
let mut b = read_offset_indexes(&test_file, &reversed).unwrap().unwrap();
b.reverse();
assert_eq!(a, b);
}
#[test]
fn test_page_index_reader_all_type() {
let test_file = get_test_file("alltypes_tiny_pages_plain.parquet");
let builder = ReadOptionsBuilder::new();
//enable read page index
let options = builder.with_page_index().build();
let reader_result = SerializedFileReader::new_with_options(test_file, options);
let reader = reader_result.unwrap();
// Test contents in Parquet metadata
let metadata = reader.metadata();
assert_eq!(metadata.num_row_groups(), 1);
let column_index = metadata.column_index().unwrap();
let row_group_offset_indexes = &metadata.offset_index().unwrap()[0];
// only one row group
assert_eq!(column_index.len(), 1);
let row_group_metadata = metadata.row_group(0);
//col0->id: INT32 UNCOMPRESSED DO:0 FPO:4 SZ:37325/37325/1.00 VC:7300 ENC:BIT_PACKED,RLE,PLAIN ST:[min: 0, max: 7299, num_nulls: 0]
assert!(!&column_index[0][0].is_sorted());
let boundary_order = &column_index[0][0].get_boundary_order();
assert!(boundary_order.is_some());
matches!(boundary_order.unwrap(), BoundaryOrder::UNORDERED);
if let Index::INT32(index) = &column_index[0][0] {
check_native_page_index(
index,
325,
get_row_group_min_max_bytes(row_group_metadata, 0),
BoundaryOrder::UNORDERED,
);
assert_eq!(row_group_offset_indexes[0].page_locations.len(), 325);
} else {
unreachable!()
};
//col1->bool_col:BOOLEAN UNCOMPRESSED DO:0 FPO:37329 SZ:3022/3022/1.00 VC:7300 ENC:BIT_PACKED,RLE,PLAIN ST:[min: false, max: true, num_nulls: 0]
assert!(&column_index[0][1].is_sorted());
if let Index::BOOLEAN(index) = &column_index[0][1] {
assert_eq!(index.indexes.len(), 82);
assert_eq!(row_group_offset_indexes[1].page_locations.len(), 82);
} else {
unreachable!()
};
//col2->tinyint_col: INT32 UNCOMPRESSED DO:0 FPO:40351 SZ:37325/37325/1.00 VC:7300 ENC:BIT_PACKED,RLE,PLAIN ST:[min: 0, max: 9, num_nulls: 0]
assert!(&column_index[0][2].is_sorted());
if let Index::INT32(index) = &column_index[0][2] {
check_native_page_index(
index,
325,
get_row_group_min_max_bytes(row_group_metadata, 2),
BoundaryOrder::ASCENDING,
);
assert_eq!(row_group_offset_indexes[2].page_locations.len(), 325);
} else {
unreachable!()
};
//col4->smallint_col: INT32 UNCOMPRESSED DO:0 FPO:77676 SZ:37325/37325/1.00 VC:7300 ENC:BIT_PACKED,RLE,PLAIN ST:[min: 0, max: 9, num_nulls: 0]
assert!(&column_index[0][3].is_sorted());
if let Index::INT32(index) = &column_index[0][3] {
check_native_page_index(
index,
325,
get_row_group_min_max_bytes(row_group_metadata, 3),
BoundaryOrder::ASCENDING,
);
assert_eq!(row_group_offset_indexes[3].page_locations.len(), 325);
} else {
unreachable!()
};
//col5->smallint_col: INT32 UNCOMPRESSED DO:0 FPO:77676 SZ:37325/37325/1.00 VC:7300 ENC:BIT_PACKED,RLE,PLAIN ST:[min: 0, max: 9, num_nulls: 0]
assert!(&column_index[0][4].is_sorted());
if let Index::INT32(index) = &column_index[0][4] {
check_native_page_index(
index,
325,
get_row_group_min_max_bytes(row_group_metadata, 4),
BoundaryOrder::ASCENDING,
);
assert_eq!(row_group_offset_indexes[4].page_locations.len(), 325);
} else {
unreachable!()
};
//col6->bigint_col: INT64 UNCOMPRESSED DO:0 FPO:152326 SZ:71598/71598/1.00 VC:7300 ENC:BIT_PACKED,RLE,PLAIN ST:[min: 0, max: 90, num_nulls: 0]
assert!(!&column_index[0][5].is_sorted());
if let Index::INT64(index) = &column_index[0][5] {
check_native_page_index(
index,
528,
get_row_group_min_max_bytes(row_group_metadata, 5),
BoundaryOrder::UNORDERED,
);
assert_eq!(row_group_offset_indexes[5].page_locations.len(), 528);
} else {
unreachable!()
};
//col7->float_col: FLOAT UNCOMPRESSED DO:0 FPO:223924 SZ:37325/37325/1.00 VC:7300 ENC:BIT_PACKED,RLE,PLAIN ST:[min: -0.0, max: 9.9, num_nulls: 0]
assert!(&column_index[0][6].is_sorted());
if let Index::FLOAT(index) = &column_index[0][6] {
check_native_page_index(
index,
325,
get_row_group_min_max_bytes(row_group_metadata, 6),
BoundaryOrder::ASCENDING,
);
assert_eq!(row_group_offset_indexes[6].page_locations.len(), 325);
} else {
unreachable!()
};
//col8->double_col: DOUBLE UNCOMPRESSED DO:0 FPO:261249 SZ:71598/71598/1.00 VC:7300 ENC:BIT_PACKED,RLE,PLAIN ST:[min: -0.0, max: 90.89999999999999, num_nulls: 0]
assert!(!&column_index[0][7].is_sorted());
if let Index::DOUBLE(index) = &column_index[0][7] {
check_native_page_index(
index,
528,
get_row_group_min_max_bytes(row_group_metadata, 7),
BoundaryOrder::UNORDERED,
);
assert_eq!(row_group_offset_indexes[7].page_locations.len(), 528);
} else {
unreachable!()
};
//col9->date_string_col: BINARY UNCOMPRESSED DO:0 FPO:332847 SZ:111948/111948/1.00 VC:7300 ENC:BIT_PACKED,RLE,PLAIN ST:[min: 01/01/09, max: 12/31/10, num_nulls: 0]
assert!(!&column_index[0][8].is_sorted());
if let Index::BYTE_ARRAY(index) = &column_index[0][8] {
check_native_page_index(
index,
974,
get_row_group_min_max_bytes(row_group_metadata, 8),
BoundaryOrder::UNORDERED,
);
assert_eq!(row_group_offset_indexes[8].page_locations.len(), 974);
} else {
unreachable!()
};
//col10->string_col: BINARY UNCOMPRESSED DO:0 FPO:444795 SZ:45298/45298/1.00 VC:7300 ENC:BIT_PACKED,RLE,PLAIN ST:[min: 0, max: 9, num_nulls: 0]
assert!(&column_index[0][9].is_sorted());
if let Index::BYTE_ARRAY(index) = &column_index[0][9] {
check_native_page_index(
index,
352,
get_row_group_min_max_bytes(row_group_metadata, 9),
BoundaryOrder::ASCENDING,
);
assert_eq!(row_group_offset_indexes[9].page_locations.len(), 352);
} else {
unreachable!()
};
//col11->timestamp_col: INT96 UNCOMPRESSED DO:0 FPO:490093 SZ:111948/111948/1.00 VC:7300 ENC:BIT_PACKED,RLE,PLAIN ST:[num_nulls: 0, min/max not defined]
//Notice: min_max values for each page for this col not exits.
assert!(!&column_index[0][10].is_sorted());
if let Index::NONE = &column_index[0][10] {
assert_eq!(row_group_offset_indexes[10].page_locations.len(), 974);
} else {
unreachable!()
};
//col12->year: INT32 UNCOMPRESSED DO:0 FPO:602041 SZ:37325/37325/1.00 VC:7300 ENC:BIT_PACKED,RLE,PLAIN ST:[min: 2009, max: 2010, num_nulls: 0]
assert!(&column_index[0][11].is_sorted());
if let Index::INT32(index) = &column_index[0][11] {
check_native_page_index(
index,
325,
get_row_group_min_max_bytes(row_group_metadata, 11),
BoundaryOrder::ASCENDING,
);
assert_eq!(row_group_offset_indexes[11].page_locations.len(), 325);
} else {
unreachable!()
};
//col13->month: INT32 UNCOMPRESSED DO:0 FPO:639366 SZ:37325/37325/1.00 VC:7300 ENC:BIT_PACKED,RLE,PLAIN ST:[min: 1, max: 12, num_nulls: 0]
assert!(!&column_index[0][12].is_sorted());
if let Index::INT32(index) = &column_index[0][12] {
check_native_page_index(
index,
325,
get_row_group_min_max_bytes(row_group_metadata, 12),
BoundaryOrder::UNORDERED,
);
assert_eq!(row_group_offset_indexes[12].page_locations.len(), 325);
} else {
unreachable!()
};
}
fn check_native_page_index<T: ParquetValueType>(
row_group_index: &NativeIndex<T>,
page_size: usize,
min_max: (&[u8], &[u8]),
boundary_order: BoundaryOrder,
) {
assert_eq!(row_group_index.indexes.len(), page_size);
assert_eq!(row_group_index.boundary_order, boundary_order);
row_group_index.indexes.iter().all(|x| {
x.min.as_ref().unwrap() >= &T::try_from_le_slice(min_max.0).unwrap()
&& x.max.as_ref().unwrap() <= &T::try_from_le_slice(min_max.1).unwrap()
});
}
fn get_row_group_min_max_bytes(r: &RowGroupMetaData, col_num: usize) -> (&[u8], &[u8]) {
let statistics = r.column(col_num).statistics().unwrap();
(
statistics.min_bytes_opt().unwrap_or_default(),
statistics.max_bytes_opt().unwrap_or_default(),
)
}
#[test]
fn test_skip_next_page_with_dictionary_page() {
let test_file = get_test_file("alltypes_tiny_pages.parquet");
let builder = ReadOptionsBuilder::new();
// enable read page index
let options = builder.with_page_index().build();
let reader_result = SerializedFileReader::new_with_options(test_file, options);
let reader = reader_result.unwrap();
let row_group_reader = reader.get_row_group(0).unwrap();
// use 'string_col', Boundary order: UNORDERED, total 352 data pages and 1 dictionary page.
let mut column_page_reader = row_group_reader.get_column_page_reader(9).unwrap();
let mut vec = vec![];
// Step 1: Peek and ensure dictionary page is correctly identified
let meta = column_page_reader.peek_next_page().unwrap().unwrap();
assert!(meta.is_dict);
// Step 2: Call skip_next_page to skip the dictionary page
column_page_reader.skip_next_page().unwrap();
// Step 3: Read the next data page after skipping the dictionary page
let page = column_page_reader.get_next_page().unwrap().unwrap();
assert!(matches!(page.page_type(), basic::PageType::DATA_PAGE));
// Step 4: Continue reading remaining data pages and verify correctness
for _i in 0..351 {
// 352 total pages, 1 dictionary page is skipped
let meta = column_page_reader.peek_next_page().unwrap().unwrap();
assert!(!meta.is_dict); // Verify no dictionary page here
vec.push(meta);
let page = column_page_reader.get_next_page().unwrap().unwrap();
assert!(matches!(page.page_type(), basic::PageType::DATA_PAGE));
}
// Step 5: Check if all pages are read
assert!(column_page_reader.peek_next_page().unwrap().is_none());
assert!(column_page_reader.get_next_page().unwrap().is_none());
// Step 6: Verify the number of data pages read (should be 351 data pages)
assert_eq!(vec.len(), 351);
}
#[test]
fn test_skip_page_with_offset_index() {
let test_file = get_test_file("alltypes_tiny_pages_plain.parquet");
let builder = ReadOptionsBuilder::new();
//enable read page index
let options = builder.with_page_index().build();
let reader_result = SerializedFileReader::new_with_options(test_file, options);
let reader = reader_result.unwrap();
let row_group_reader = reader.get_row_group(0).unwrap();
//use 'int_col', Boundary order: ASCENDING, total 325 pages.
let mut column_page_reader = row_group_reader.get_column_page_reader(4).unwrap();
let mut vec = vec![];
for i in 0..325 {
if i % 2 == 0 {
vec.push(column_page_reader.get_next_page().unwrap().unwrap());
} else {
column_page_reader.skip_next_page().unwrap();
}
}
//check read all pages.
assert!(column_page_reader.peek_next_page().unwrap().is_none());
assert!(column_page_reader.get_next_page().unwrap().is_none());
assert_eq!(vec.len(), 163);
}
#[test]
fn test_skip_page_without_offset_index() {
let test_file = get_test_file("alltypes_tiny_pages_plain.parquet");
// use default SerializedFileReader without read offsetIndex
let reader_result = SerializedFileReader::new(test_file);
let reader = reader_result.unwrap();
let row_group_reader = reader.get_row_group(0).unwrap();
//use 'int_col', Boundary order: ASCENDING, total 325 pages.
let mut column_page_reader = row_group_reader.get_column_page_reader(4).unwrap();
let mut vec = vec![];
for i in 0..325 {
if i % 2 == 0 {
vec.push(column_page_reader.get_next_page().unwrap().unwrap());
} else {
column_page_reader.peek_next_page().unwrap().unwrap();
column_page_reader.skip_next_page().unwrap();
}
}
//check read all pages.
assert!(column_page_reader.peek_next_page().unwrap().is_none());
assert!(column_page_reader.get_next_page().unwrap().is_none());
assert_eq!(vec.len(), 163);
}
#[test]
fn test_peek_page_with_dictionary_page() {
let test_file = get_test_file("alltypes_tiny_pages.parquet");
let builder = ReadOptionsBuilder::new();
//enable read page index
let options = builder.with_page_index().build();
let reader_result = SerializedFileReader::new_with_options(test_file, options);
let reader = reader_result.unwrap();
let row_group_reader = reader.get_row_group(0).unwrap();
//use 'string_col', Boundary order: UNORDERED, total 352 data ages and 1 dictionary page.
let mut column_page_reader = row_group_reader.get_column_page_reader(9).unwrap();
let mut vec = vec![];
let meta = column_page_reader.peek_next_page().unwrap().unwrap();
assert!(meta.is_dict);
let page = column_page_reader.get_next_page().unwrap().unwrap();
assert!(matches!(page.page_type(), basic::PageType::DICTIONARY_PAGE));
for i in 0..352 {
let meta = column_page_reader.peek_next_page().unwrap().unwrap();
// have checked with `parquet-tools column-index -c string_col ./alltypes_tiny_pages.parquet`
// page meta has two scenarios(21, 20) of num_rows expect last page has 11 rows.
if i != 351 {
assert!((meta.num_rows == Some(21)) || (meta.num_rows == Some(20)));
} else {
// last page first row index is 7290, total row count is 7300
// because first row start with zero, last page row count should be 10.
assert_eq!(meta.num_rows, Some(10));
}
assert!(!meta.is_dict);
vec.push(meta);
let page = column_page_reader.get_next_page().unwrap().unwrap();
assert!(matches!(page.page_type(), basic::PageType::DATA_PAGE));
}
//check read all pages.
assert!(column_page_reader.peek_next_page().unwrap().is_none());
assert!(column_page_reader.get_next_page().unwrap().is_none());
assert_eq!(vec.len(), 352);
}
#[test]
fn test_peek_page_with_dictionary_page_without_offset_index() {
let test_file = get_test_file("alltypes_tiny_pages.parquet");
let reader_result = SerializedFileReader::new(test_file);
let reader = reader_result.unwrap();
let row_group_reader = reader.get_row_group(0).unwrap();
//use 'string_col', Boundary order: UNORDERED, total 352 data ages and 1 dictionary page.
let mut column_page_reader = row_group_reader.get_column_page_reader(9).unwrap();
let mut vec = vec![];
let meta = column_page_reader.peek_next_page().unwrap().unwrap();
assert!(meta.is_dict);
let page = column_page_reader.get_next_page().unwrap().unwrap();
assert!(matches!(page.page_type(), basic::PageType::DICTIONARY_PAGE));
for i in 0..352 {
let meta = column_page_reader.peek_next_page().unwrap().unwrap();
// have checked with `parquet-tools column-index -c string_col ./alltypes_tiny_pages.parquet`
// page meta has two scenarios(21, 20) of num_rows expect last page has 11 rows.
if i != 351 {
assert!((meta.num_levels == Some(21)) || (meta.num_levels == Some(20)));
} else {
// last page first row index is 7290, total row count is 7300
// because first row start with zero, last page row count should be 10.
assert_eq!(meta.num_levels, Some(10));
}
assert!(!meta.is_dict);
vec.push(meta);
let page = column_page_reader.get_next_page().unwrap().unwrap();
assert!(matches!(page.page_type(), basic::PageType::DATA_PAGE));
}
//check read all pages.
assert!(column_page_reader.peek_next_page().unwrap().is_none());
assert!(column_page_reader.get_next_page().unwrap().is_none());
assert_eq!(vec.len(), 352);
}
#[test]
fn test_fixed_length_index() {
let message_type = "
message test_schema {
OPTIONAL FIXED_LEN_BYTE_ARRAY (11) value (DECIMAL(25,2));
}
";
let schema = parse_message_type(message_type).unwrap();
let mut out = Vec::with_capacity(1024);
let mut writer =
SerializedFileWriter::new(&mut out, Arc::new(schema), Default::default()).unwrap();
let mut r = writer.next_row_group().unwrap();
let mut c = r.next_column().unwrap().unwrap();
c.typed::<FixedLenByteArrayType>()
.write_batch(
&[vec![0; 11].into(), vec![5; 11].into(), vec![3; 11].into()],
Some(&[1, 1, 0, 1]),
None,
)
.unwrap();
c.close().unwrap();
r.close().unwrap();
writer.close().unwrap();
let b = Bytes::from(out);
let options = ReadOptionsBuilder::new().with_page_index().build();
let reader = SerializedFileReader::new_with_options(b, options).unwrap();
let index = reader.metadata().column_index().unwrap();
// 1 row group
assert_eq!(index.len(), 1);
let c = &index[0];
// 1 column
assert_eq!(c.len(), 1);
match &c[0] {
Index::FIXED_LEN_BYTE_ARRAY(v) => {
assert_eq!(v.indexes.len(), 1);
let page_idx = &v.indexes[0];
assert_eq!(page_idx.null_count.unwrap(), 1);
assert_eq!(page_idx.min.as_ref().unwrap().as_ref(), &[0; 11]);
assert_eq!(page_idx.max.as_ref().unwrap().as_ref(), &[5; 11]);
}
_ => unreachable!(),
}
}
#[test]
fn test_multi_gz() {
let file = get_test_file("concatenated_gzip_members.parquet");
let reader = SerializedFileReader::new(file).unwrap();
let row_group_reader = reader.get_row_group(0).unwrap();
match row_group_reader.get_column_reader(0).unwrap() {
ColumnReader::Int64ColumnReader(mut reader) => {
let mut buffer = Vec::with_capacity(1024);
let mut def_levels = Vec::with_capacity(1024);
let (num_records, num_values, num_levels) = reader
.read_records(1024, Some(&mut def_levels), None, &mut buffer)
.unwrap();
assert_eq!(num_records, 513);
assert_eq!(num_values, 513);
assert_eq!(num_levels, 513);
let expected: Vec<i64> = (1..514).collect();
assert_eq!(&buffer, &expected);
}
_ => unreachable!(),
}
}
#[test]
fn test_byte_stream_split_extended() {
let path = format!(
"{}/byte_stream_split_extended.gzip.parquet",
arrow::util::test_util::parquet_test_data(),
);
let file = File::open(path).unwrap();
let reader = Box::new(SerializedFileReader::new(file).expect("Failed to create reader"));
// Use full schema as projected schema
let mut iter = reader
.get_row_iter(None)
.expect("Failed to create row iterator");
let mut start = 0;
let end = reader.metadata().file_metadata().num_rows();
let check_row = |row: Result<Row, ParquetError>| {
assert!(row.is_ok());
let r = row.unwrap();
assert_eq!(r.get_float16(0).unwrap(), r.get_float16(1).unwrap());
assert_eq!(r.get_float(2).unwrap(), r.get_float(3).unwrap());
assert_eq!(r.get_double(4).unwrap(), r.get_double(5).unwrap());
assert_eq!(r.get_int(6).unwrap(), r.get_int(7).unwrap());
assert_eq!(r.get_long(8).unwrap(), r.get_long(9).unwrap());
assert_eq!(r.get_bytes(10).unwrap(), r.get_bytes(11).unwrap());
assert_eq!(r.get_decimal(12).unwrap(), r.get_decimal(13).unwrap());
};
while start < end {
match iter.next() {
Some(row) => check_row(row),
None => break,
};
start += 1;
}
}
#[test]
fn test_filtered_rowgroup_metadata() {
let message_type = "
message test_schema {
REQUIRED INT32 a;
}
";
let schema = Arc::new(parse_message_type(message_type).unwrap());
let props = Arc::new(
WriterProperties::builder()
.set_statistics_enabled(EnabledStatistics::Page)
.build(),
);
let mut file: File = tempfile::tempfile().unwrap();
let mut file_writer = SerializedFileWriter::new(&mut file, schema, props).unwrap();
let data = [1, 2, 3, 4, 5];
// write 5 row groups
for idx in 0..5 {
let data_i: Vec<i32> = data.iter().map(|x| x * (idx + 1)).collect();
let mut row_group_writer = file_writer.next_row_group().unwrap();
if let Some(mut writer) = row_group_writer.next_column().unwrap() {
writer
.typed::<Int32Type>()
.write_batch(data_i.as_slice(), None, None)
.unwrap();
writer.close().unwrap();
}
row_group_writer.close().unwrap();
file_writer.flushed_row_groups();
}
let file_metadata = file_writer.close().unwrap();
assert_eq!(file_metadata.num_rows, 25);
assert_eq!(file_metadata.row_groups.len(), 5);
// read only the 3rd row group
let read_options = ReadOptionsBuilder::new()
.with_page_index()
.with_predicate(Box::new(|rgmeta, _| rgmeta.ordinal().unwrap_or(0) == 2))
.build();
let reader =
SerializedFileReader::new_with_options(file.try_clone().unwrap(), read_options)
.unwrap();
let metadata = reader.metadata();
// check we got the expected row group
assert_eq!(metadata.num_row_groups(), 1);
assert_eq!(metadata.row_group(0).ordinal(), Some(2));
// check we only got the relevant page indexes
assert!(metadata.column_index().is_some());
assert!(metadata.offset_index().is_some());
assert_eq!(metadata.column_index().unwrap().len(), 1);
assert_eq!(metadata.offset_index().unwrap().len(), 1);
let col_idx = metadata.column_index().unwrap();
let off_idx = metadata.offset_index().unwrap();
let col_stats = metadata.row_group(0).column(0).statistics().unwrap();
let pg_idx = &col_idx[0][0];
let off_idx_i = &off_idx[0][0];
// test that we got the index matching the row group
match pg_idx {
Index::INT32(int_idx) => {
let min = col_stats.min_bytes_opt().unwrap().get_i32_le();
let max = col_stats.max_bytes_opt().unwrap().get_i32_le();
assert_eq!(int_idx.indexes[0].min(), Some(min).as_ref());
assert_eq!(int_idx.indexes[0].max(), Some(max).as_ref());
}
_ => panic!("wrong stats type"),
}
// check offset index matches too
assert_eq!(
off_idx_i.page_locations[0].offset,
metadata.row_group(0).column(0).data_page_offset()
);
// read non-contiguous row groups
let read_options = ReadOptionsBuilder::new()
.with_page_index()
.with_predicate(Box::new(|rgmeta, _| rgmeta.ordinal().unwrap_or(0) % 2 == 1))
.build();
let reader =
SerializedFileReader::new_with_options(file.try_clone().unwrap(), read_options)
.unwrap();
let metadata = reader.metadata();
// check we got the expected row groups
assert_eq!(metadata.num_row_groups(), 2);
assert_eq!(metadata.row_group(0).ordinal(), Some(1));
assert_eq!(metadata.row_group(1).ordinal(), Some(3));
// check we only got the relevant page indexes
assert!(metadata.column_index().is_some());
assert!(metadata.offset_index().is_some());
assert_eq!(metadata.column_index().unwrap().len(), 2);
assert_eq!(metadata.offset_index().unwrap().len(), 2);
let col_idx = metadata.column_index().unwrap();
let off_idx = metadata.offset_index().unwrap();
for (i, col_idx_i) in col_idx.iter().enumerate().take(metadata.num_row_groups()) {
let col_stats = metadata.row_group(i).column(0).statistics().unwrap();
let pg_idx = &col_idx_i[0];
let off_idx_i = &off_idx[i][0];
// test that we got the index matching the row group
match pg_idx {
Index::INT32(int_idx) => {
let min = col_stats.min_bytes_opt().unwrap().get_i32_le();
let max = col_stats.max_bytes_opt().unwrap().get_i32_le();
assert_eq!(int_idx.indexes[0].min(), Some(min).as_ref());
assert_eq!(int_idx.indexes[0].max(), Some(max).as_ref());
}
_ => panic!("wrong stats type"),
}
// check offset index matches too
assert_eq!(
off_idx_i.page_locations[0].offset,
metadata.row_group(i).column(0).data_page_offset()
);
}
}
}