parquet/src/file/metadata/mod.rs (1,352 lines of code) (raw):

// Licensed to the Apache Software Foundation (ASF) under one // or more contributor license agreements. See the NOTICE file // distributed with this work for additional information // regarding copyright ownership. The ASF licenses this file // to you under the Apache License, Version 2.0 (the // "License"); you may not use this file except in compliance // with the License. You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, // software distributed under the License is distributed on an // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. //! Parquet metadata API //! //! Most users should use these structures to interact with Parquet metadata. //! The [crate::format] module contains lower level structures generated from the //! Parquet thrift definition. //! //! * [`ParquetMetaData`]: Top level metadata container, read from the Parquet //! file footer. //! //! * [`FileMetaData`]: File level metadata such as schema, row counts and //! version. //! //! * [`RowGroupMetaData`]: Metadata for each Row Group with a File, such as //! location and number of rows, and column chunks. //! //! * [`ColumnChunkMetaData`]: Metadata for each column chunk (primitive leaf) //! within a Row Group including encoding and compression information, //! number of values, statistics, etc. //! //! # APIs for working with Parquet Metadata //! //! The Parquet readers and writers in this crate handle reading and writing //! metadata into parquet files. To work with metadata directly, //! the following APIs are available: //! //! * [`ParquetMetaDataReader`] for reading //! * [`ParquetMetaDataWriter`] for writing. //! //! [`ParquetMetaDataReader`]: https://docs.rs/parquet/latest/parquet/file/metadata/struct.ParquetMetaDataReader.html //! [`ParquetMetaDataWriter`]: https://docs.rs/parquet/latest/parquet/file/metadata/struct.ParquetMetaDataWriter.html //! //! # Examples //! //! Please see [`external_metadata.rs`] //! //! [`external_metadata.rs`]: https://github.com/apache/arrow-rs/tree/master/parquet/examples/external_metadata.rs //! //! # Metadata Encodings and Structures //! //! There are three different encodings of Parquet Metadata in this crate: //! //! 1. `bytes`:encoded with the Thrift `TCompactProtocol` as defined in //! [parquet.thrift] //! //! 2. [`format`]: Rust structures automatically generated by the thrift compiler //! from [parquet.thrift]. These structures are low level and mirror //! the thrift definitions. //! //! 3. [`file::metadata`] (this module): Easier to use Rust structures //! with a more idiomatic API. Note that, confusingly, some but not all //! of these structures have the same name as the [`format`] structures. //! //! [`format`]: crate::format //! [`file::metadata`]: crate::file::metadata //! [parquet.thrift]: https://github.com/apache/parquet-format/blob/master/src/main/thrift/parquet.thrift //! //! Graphically, this is how the different structures relate to each other: //! //! ```text //! ┌ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┌ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ //! ┌──────────────┐ │ ┌───────────────────────┐ │ //! │ │ ColumnIndex │ ││ ParquetMetaData │ //! └──────────────┘ │ └───────────────────────┘ │ //! ┌──────────────┐ │ ┌────────────────┐ │┌───────────────────────┐ //! │ ..0x24.. │ ◀────▶ │ OffsetIndex │ │ ◀────▶ │ ParquetMetaData │ │ //! └──────────────┘ │ └────────────────┘ │└───────────────────────┘ //! ... │ ... │ //! │ ┌──────────────────┐ │ ┌──────────────────┐ //! bytes │ FileMetaData* │ │ │ FileMetaData* │ │ //! (thrift encoded) │ └──────────────────┘ │ └──────────────────┘ //! ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┘ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┘ //! //! format::meta structures file::metadata structures //! //! * Same name, different struct //! ``` mod memory; pub(crate) mod reader; mod writer; use crate::basic::{ColumnOrder, Compression, Encoding, Type}; #[cfg(feature = "encryption")] use crate::encryption::{ decrypt::FileDecryptor, modules::{create_module_aad, ModuleType}, }; use crate::errors::{ParquetError, Result}; #[cfg(feature = "encryption")] use crate::file::column_crypto_metadata::{self, ColumnCryptoMetaData}; pub(crate) use crate::file::metadata::memory::HeapSize; use crate::file::page_encoding_stats::{self, PageEncodingStats}; use crate::file::page_index::index::Index; use crate::file::page_index::offset_index::OffsetIndexMetaData; use crate::file::statistics::{self, Statistics}; use crate::format::ColumnCryptoMetaData as TColumnCryptoMetaData; use crate::format::{ BoundaryOrder, ColumnChunk, ColumnIndex, ColumnMetaData, OffsetIndex, PageLocation, RowGroup, SizeStatistics, SortingColumn, }; use crate::schema::types::{ ColumnDescPtr, ColumnDescriptor, ColumnPath, SchemaDescPtr, SchemaDescriptor, Type as SchemaType, }; #[cfg(feature = "encryption")] use crate::thrift::{TCompactSliceInputProtocol, TSerializable}; pub use reader::{FooterTail, ParquetMetaDataReader}; use std::ops::Range; use std::sync::Arc; pub use writer::ParquetMetaDataWriter; pub(crate) use writer::ThriftMetadataWriter; /// Page level statistics for each column chunk of each row group. /// /// This structure is an in-memory representation of multiple [`ColumnIndex`] /// structures in a parquet file footer, as described in the Parquet [PageIndex /// documentation]. Each [`Index`] holds statistics about all the pages in a /// particular column chunk. /// /// `column_index[row_group_number][column_number]` holds the /// [`Index`] corresponding to column `column_number` of row group /// `row_group_number`. /// /// For example `column_index[2][3]` holds the [`Index`] for the fourth /// column in the third row group of the parquet file. /// /// [PageIndex documentation]: https://github.com/apache/parquet-format/blob/master/PageIndex.md pub type ParquetColumnIndex = Vec<Vec<Index>>; /// [`OffsetIndexMetaData`] for each data page of each row group of each column /// /// This structure is the parsed representation of the [`OffsetIndex`] from the /// Parquet file footer, as described in the Parquet [PageIndex documentation]. /// /// `offset_index[row_group_number][column_number]` holds /// the [`OffsetIndexMetaData`] corresponding to column /// `column_number`of row group `row_group_number`. /// /// [PageIndex documentation]: https://github.com/apache/parquet-format/blob/master/PageIndex.md pub type ParquetOffsetIndex = Vec<Vec<OffsetIndexMetaData>>; /// Parsed metadata for a single Parquet file /// /// This structure is stored in the footer of Parquet files, in the format /// defined by [`parquet.thrift`]. /// /// # Overview /// The fields of this structure are: /// * [`FileMetaData`]: Information about the overall file (such as the schema) (See [`Self::file_metadata`]) /// * [`RowGroupMetaData`]: Information about each Row Group (see [`Self::row_groups`]) /// * [`ParquetColumnIndex`] and [`ParquetOffsetIndex`]: Optional "Page Index" structures (see [`Self::column_index`] and [`Self::offset_index`]) /// /// This structure is read by the various readers in this crate or can be read /// directly from a file using the [`ParquetMetaDataReader`] struct. /// /// See the [`ParquetMetaDataBuilder`] to create and modify this structure. /// /// [`parquet.thrift`]: https://github.com/apache/parquet-format/blob/master/src/main/thrift/parquet.thrift #[derive(Debug, Clone, PartialEq)] pub struct ParquetMetaData { /// File level metadata file_metadata: FileMetaData, /// Row group metadata row_groups: Vec<RowGroupMetaData>, /// Page level index for each page in each column chunk column_index: Option<ParquetColumnIndex>, /// Offset index for each page in each column chunk offset_index: Option<ParquetOffsetIndex>, /// Optional file decryptor #[cfg(feature = "encryption")] file_decryptor: Option<FileDecryptor>, } impl ParquetMetaData { /// Creates Parquet metadata from file metadata and a list of row /// group metadata pub fn new(file_metadata: FileMetaData, row_groups: Vec<RowGroupMetaData>) -> Self { ParquetMetaData { file_metadata, row_groups, #[cfg(feature = "encryption")] file_decryptor: None, column_index: None, offset_index: None, } } /// Adds [`FileDecryptor`] to this metadata instance to enable decryption of /// encrypted data. #[cfg(feature = "encryption")] pub(crate) fn with_file_decryptor(&mut self, file_decryptor: Option<FileDecryptor>) { self.file_decryptor = file_decryptor; } /// Creates Parquet metadata from file metadata, a list of row /// group metadata, and the column index structures. #[deprecated(since = "53.1.0", note = "Use ParquetMetaDataBuilder")] pub fn new_with_page_index( file_metadata: FileMetaData, row_groups: Vec<RowGroupMetaData>, column_index: Option<ParquetColumnIndex>, offset_index: Option<ParquetOffsetIndex>, ) -> Self { ParquetMetaDataBuilder::new(file_metadata) .set_row_groups(row_groups) .set_column_index(column_index) .set_offset_index(offset_index) .build() } /// Convert this ParquetMetaData into a [`ParquetMetaDataBuilder`] pub fn into_builder(self) -> ParquetMetaDataBuilder { self.into() } /// Returns file metadata as reference. pub fn file_metadata(&self) -> &FileMetaData { &self.file_metadata } /// Returns file decryptor as reference. #[cfg(feature = "encryption")] pub(crate) fn file_decryptor(&self) -> Option<&FileDecryptor> { self.file_decryptor.as_ref() } /// Returns number of row groups in this file. pub fn num_row_groups(&self) -> usize { self.row_groups.len() } /// Returns row group metadata for `i`th position. /// Position should be less than number of row groups `num_row_groups`. pub fn row_group(&self, i: usize) -> &RowGroupMetaData { &self.row_groups[i] } /// Returns slice of row groups in this file. pub fn row_groups(&self) -> &[RowGroupMetaData] { &self.row_groups } /// Returns the column index for this file if loaded /// /// Returns `None` if the parquet file does not have a `ColumnIndex` or /// [ArrowReaderOptions::with_page_index] was set to false. /// /// [ArrowReaderOptions::with_page_index]: https://docs.rs/parquet/latest/parquet/arrow/arrow_reader/struct.ArrowReaderOptions.html#method.with_page_index pub fn column_index(&self) -> Option<&ParquetColumnIndex> { self.column_index.as_ref() } /// Returns offset indexes in this file, if loaded /// /// Returns `None` if the parquet file does not have a `OffsetIndex` or /// [ArrowReaderOptions::with_page_index] was set to false. /// /// [ArrowReaderOptions::with_page_index]: https://docs.rs/parquet/latest/parquet/arrow/arrow_reader/struct.ArrowReaderOptions.html#method.with_page_index pub fn offset_index(&self) -> Option<&ParquetOffsetIndex> { self.offset_index.as_ref() } /// Estimate of the bytes allocated to store `ParquetMetadata` /// /// # Notes: /// /// 1. Includes size of self /// /// 2. Includes heap memory for sub fields such as [`FileMetaData`] and /// [`RowGroupMetaData`]. /// /// 3. Includes memory from shared pointers (e.g. [`SchemaDescPtr`]). This /// means `memory_size` will over estimate the memory size if such pointers /// are shared. /// /// 4. Does not include any allocator overheads pub fn memory_size(&self) -> usize { std::mem::size_of::<Self>() + self.file_metadata.heap_size() + self.row_groups.heap_size() + self.column_index.heap_size() + self.offset_index.heap_size() } /// Override the column index pub(crate) fn set_column_index(&mut self, index: Option<ParquetColumnIndex>) { self.column_index = index; } /// Override the offset index pub(crate) fn set_offset_index(&mut self, index: Option<ParquetOffsetIndex>) { self.offset_index = index; } } /// A builder for creating / manipulating [`ParquetMetaData`] /// /// # Example creating a new [`ParquetMetaData`] /// ///```no_run /// # use parquet::file::metadata::{FileMetaData, ParquetMetaData, ParquetMetaDataBuilder, RowGroupMetaData, RowGroupMetaDataBuilder}; /// # fn get_file_metadata() -> FileMetaData { unimplemented!(); } /// // Create a new builder given the file metadata /// let file_metadata = get_file_metadata(); /// // Create a row group /// let row_group = RowGroupMetaData::builder(file_metadata.schema_descr_ptr()) /// .set_num_rows(100) /// // ... (A real row group needs more than just the number of rows) /// .build() /// .unwrap(); /// // Create the final metadata /// let metadata: ParquetMetaData = ParquetMetaDataBuilder::new(file_metadata) /// .add_row_group(row_group) /// .build(); /// ``` /// /// # Example modifying an existing [`ParquetMetaData`] /// ```no_run /// # use parquet::file::metadata::ParquetMetaData; /// # fn load_metadata() -> ParquetMetaData { unimplemented!(); } /// // Modify the metadata so only the last RowGroup remains /// let metadata: ParquetMetaData = load_metadata(); /// let mut builder = metadata.into_builder(); /// /// // Take existing row groups to modify /// let mut row_groups = builder.take_row_groups(); /// let last_row_group = row_groups.pop().unwrap(); /// /// let metadata = builder /// .add_row_group(last_row_group) /// .build(); /// ``` pub struct ParquetMetaDataBuilder(ParquetMetaData); impl ParquetMetaDataBuilder { /// Create a new builder from a file metadata, with no row groups pub fn new(file_meta_data: FileMetaData) -> Self { Self(ParquetMetaData::new(file_meta_data, vec![])) } /// Create a new builder from an existing ParquetMetaData pub fn new_from_metadata(metadata: ParquetMetaData) -> Self { Self(metadata) } /// Adds a row group to the metadata pub fn add_row_group(mut self, row_group: RowGroupMetaData) -> Self { self.0.row_groups.push(row_group); self } /// Sets all the row groups to the specified list pub fn set_row_groups(mut self, row_groups: Vec<RowGroupMetaData>) -> Self { self.0.row_groups = row_groups; self } /// Takes ownership of the row groups in this builder, and clears the list /// of row groups. /// /// This can be used for more efficient creation of a new ParquetMetaData /// from an existing one. pub fn take_row_groups(&mut self) -> Vec<RowGroupMetaData> { std::mem::take(&mut self.0.row_groups) } /// Return a reference to the current row groups pub fn row_groups(&self) -> &[RowGroupMetaData] { &self.0.row_groups } /// Sets the column index pub fn set_column_index(mut self, column_index: Option<ParquetColumnIndex>) -> Self { self.0.column_index = column_index; self } /// Returns the current column index from the builder, replacing it with `None` pub fn take_column_index(&mut self) -> Option<ParquetColumnIndex> { std::mem::take(&mut self.0.column_index) } /// Return a reference to the current column index, if any pub fn column_index(&self) -> Option<&ParquetColumnIndex> { self.0.column_index.as_ref() } /// Sets the offset index pub fn set_offset_index(mut self, offset_index: Option<ParquetOffsetIndex>) -> Self { self.0.offset_index = offset_index; self } /// Returns the current offset index from the builder, replacing it with `None` pub fn take_offset_index(&mut self) -> Option<ParquetOffsetIndex> { std::mem::take(&mut self.0.offset_index) } /// Return a reference to the current offset index, if any pub fn offset_index(&self) -> Option<&ParquetOffsetIndex> { self.0.offset_index.as_ref() } /// Creates a new ParquetMetaData from the builder pub fn build(self) -> ParquetMetaData { let Self(metadata) = self; metadata } } impl From<ParquetMetaData> for ParquetMetaDataBuilder { fn from(meta_data: ParquetMetaData) -> Self { Self(meta_data) } } /// A key-value pair for [`FileMetaData`]. pub type KeyValue = crate::format::KeyValue; /// Reference counted pointer for [`FileMetaData`]. pub type FileMetaDataPtr = Arc<FileMetaData>; /// File level metadata for a Parquet file. /// /// Includes the version of the file, metadata, number of rows, schema, and column orders #[derive(Debug, Clone, PartialEq)] pub struct FileMetaData { version: i32, num_rows: i64, created_by: Option<String>, key_value_metadata: Option<Vec<KeyValue>>, schema_descr: SchemaDescPtr, column_orders: Option<Vec<ColumnOrder>>, } impl FileMetaData { /// Creates new file metadata. pub fn new( version: i32, num_rows: i64, created_by: Option<String>, key_value_metadata: Option<Vec<KeyValue>>, schema_descr: SchemaDescPtr, column_orders: Option<Vec<ColumnOrder>>, ) -> Self { FileMetaData { version, num_rows, created_by, key_value_metadata, schema_descr, column_orders, } } /// Returns version of this file. pub fn version(&self) -> i32 { self.version } /// Returns number of rows in the file. pub fn num_rows(&self) -> i64 { self.num_rows } /// String message for application that wrote this file. /// /// This should have the following format: /// `<application> version <application version> (build <application build hash>)`. /// /// ```shell /// parquet-mr version 1.8.0 (build 0fda28af84b9746396014ad6a415b90592a98b3b) /// ``` pub fn created_by(&self) -> Option<&str> { self.created_by.as_deref() } /// Returns key_value_metadata of this file. pub fn key_value_metadata(&self) -> Option<&Vec<KeyValue>> { self.key_value_metadata.as_ref() } /// Returns Parquet [`Type`] that describes schema in this file. /// /// [`Type`]: crate::schema::types::Type pub fn schema(&self) -> &SchemaType { self.schema_descr.root_schema() } /// Returns a reference to schema descriptor. pub fn schema_descr(&self) -> &SchemaDescriptor { &self.schema_descr } /// Returns reference counted clone for schema descriptor. pub fn schema_descr_ptr(&self) -> SchemaDescPtr { self.schema_descr.clone() } /// Column (sort) order used for `min` and `max` values of each column in this file. /// /// Each column order corresponds to one column, determined by its position in the /// list, matching the position of the column in the schema. /// /// When `None` is returned, there are no column orders available, and each column /// should be assumed to have undefined (legacy) column order. pub fn column_orders(&self) -> Option<&Vec<ColumnOrder>> { self.column_orders.as_ref() } /// Returns column order for `i`th column in this file. /// If column orders are not available, returns undefined (legacy) column order. pub fn column_order(&self, i: usize) -> ColumnOrder { self.column_orders .as_ref() .map(|data| data[i]) .unwrap_or(ColumnOrder::UNDEFINED) } } /// Reference counted pointer for [`RowGroupMetaData`]. pub type RowGroupMetaDataPtr = Arc<RowGroupMetaData>; /// Metadata for a row group /// /// Includes [`ColumnChunkMetaData`] for each column in the row group, the number of rows /// the total byte size of the row group, and the [`SchemaDescriptor`] for the row group. #[derive(Debug, Clone, PartialEq)] pub struct RowGroupMetaData { columns: Vec<ColumnChunkMetaData>, num_rows: i64, sorting_columns: Option<Vec<SortingColumn>>, total_byte_size: i64, schema_descr: SchemaDescPtr, /// We can't infer from file offset of first column since there may empty columns in row group. file_offset: Option<i64>, /// Ordinal position of this row group in file ordinal: Option<i16>, } impl RowGroupMetaData { /// Returns builder for row group metadata. pub fn builder(schema_descr: SchemaDescPtr) -> RowGroupMetaDataBuilder { RowGroupMetaDataBuilder::new(schema_descr) } /// Number of columns in this row group. pub fn num_columns(&self) -> usize { self.columns.len() } /// Returns column chunk metadata for `i`th column. pub fn column(&self, i: usize) -> &ColumnChunkMetaData { &self.columns[i] } /// Returns slice of column chunk metadata. pub fn columns(&self) -> &[ColumnChunkMetaData] { &self.columns } /// Returns mutable slice of column chunk metadata. pub fn columns_mut(&mut self) -> &mut [ColumnChunkMetaData] { &mut self.columns } /// Number of rows in this row group. pub fn num_rows(&self) -> i64 { self.num_rows } /// Returns the sort ordering of the rows in this RowGroup if any pub fn sorting_columns(&self) -> Option<&Vec<SortingColumn>> { self.sorting_columns.as_ref() } /// Total byte size of all uncompressed column data in this row group. pub fn total_byte_size(&self) -> i64 { self.total_byte_size } /// Total size of all compressed column data in this row group. pub fn compressed_size(&self) -> i64 { self.columns.iter().map(|c| c.total_compressed_size).sum() } /// Returns reference to a schema descriptor. pub fn schema_descr(&self) -> &SchemaDescriptor { self.schema_descr.as_ref() } /// Returns reference counted clone of schema descriptor. pub fn schema_descr_ptr(&self) -> SchemaDescPtr { self.schema_descr.clone() } /// Returns ordinal position of this row group in file. /// /// For example if this is the first row group in the file, this will return 0. /// If this is the second row group in the file, this will return 1. #[inline(always)] pub fn ordinal(&self) -> Option<i16> { self.ordinal } /// Returns file offset of this row group in file. #[inline(always)] pub fn file_offset(&self) -> Option<i64> { self.file_offset } /// Method to convert from encrypted Thrift. #[cfg(feature = "encryption")] fn from_encrypted_thrift( schema_descr: SchemaDescPtr, mut rg: RowGroup, decryptor: Option<&FileDecryptor>, ) -> Result<RowGroupMetaData> { if schema_descr.num_columns() != rg.columns.len() { return Err(general_err!( "Column count mismatch. Schema has {} columns while Row Group has {}", schema_descr.num_columns(), rg.columns.len() )); } let total_byte_size = rg.total_byte_size; let num_rows = rg.num_rows; let mut columns = vec![]; for (i, (mut c, d)) in rg .columns .drain(0..) .zip(schema_descr.columns()) .enumerate() { // Read encrypted metadata if it's present and we have a decryptor. if let (true, Some(decryptor)) = (c.encrypted_column_metadata.is_some(), decryptor) { let column_decryptor = match c.crypto_metadata.as_ref() { None => { return Err(general_err!( "No crypto_metadata is set for column '{}', which has encrypted metadata", d.path().string() )); } Some(TColumnCryptoMetaData::ENCRYPTIONWITHCOLUMNKEY(crypto_metadata)) => { let column_name = crypto_metadata.path_in_schema.join("."); decryptor.get_column_metadata_decryptor( column_name.as_str(), crypto_metadata.key_metadata.as_deref(), )? } Some(TColumnCryptoMetaData::ENCRYPTIONWITHFOOTERKEY(_)) => { decryptor.get_footer_decryptor()? } }; let column_aad = create_module_aad( decryptor.file_aad(), ModuleType::ColumnMetaData, rg.ordinal.unwrap() as usize, i, None, )?; let buf = c.encrypted_column_metadata.clone().unwrap(); let decrypted_cc_buf = column_decryptor .decrypt(buf.as_slice(), column_aad.as_ref()) .map_err(|_| { general_err!( "Unable to decrypt column '{}', perhaps the column key is wrong?", d.path().string() ) })?; let mut prot = TCompactSliceInputProtocol::new(decrypted_cc_buf.as_slice()); c.meta_data = Some(ColumnMetaData::read_from_in_protocol(&mut prot)?); } columns.push(ColumnChunkMetaData::from_thrift(d.clone(), c)?); } let sorting_columns = rg.sorting_columns; Ok(RowGroupMetaData { columns, num_rows, sorting_columns, total_byte_size, schema_descr, file_offset: rg.file_offset, ordinal: rg.ordinal, }) } /// Method to convert from Thrift. pub fn from_thrift(schema_descr: SchemaDescPtr, mut rg: RowGroup) -> Result<RowGroupMetaData> { if schema_descr.num_columns() != rg.columns.len() { return Err(general_err!( "Column count mismatch. Schema has {} columns while Row Group has {}", schema_descr.num_columns(), rg.columns.len() )); } let total_byte_size = rg.total_byte_size; let num_rows = rg.num_rows; let mut columns = vec![]; for (c, d) in rg.columns.drain(0..).zip(schema_descr.columns()) { columns.push(ColumnChunkMetaData::from_thrift(d.clone(), c)?); } let sorting_columns = rg.sorting_columns; Ok(RowGroupMetaData { columns, num_rows, sorting_columns, total_byte_size, schema_descr, file_offset: rg.file_offset, ordinal: rg.ordinal, }) } /// Method to convert to Thrift. pub fn to_thrift(&self) -> RowGroup { RowGroup { columns: self.columns().iter().map(|v| v.to_thrift()).collect(), total_byte_size: self.total_byte_size, num_rows: self.num_rows, sorting_columns: self.sorting_columns().cloned(), file_offset: self.file_offset(), total_compressed_size: Some(self.compressed_size()), ordinal: self.ordinal, } } /// Converts this [`RowGroupMetaData`] into a [`RowGroupMetaDataBuilder`] pub fn into_builder(self) -> RowGroupMetaDataBuilder { RowGroupMetaDataBuilder(self) } } /// Builder for row group metadata. pub struct RowGroupMetaDataBuilder(RowGroupMetaData); impl RowGroupMetaDataBuilder { /// Creates new builder from schema descriptor. fn new(schema_descr: SchemaDescPtr) -> Self { Self(RowGroupMetaData { columns: Vec::with_capacity(schema_descr.num_columns()), schema_descr, file_offset: None, num_rows: 0, sorting_columns: None, total_byte_size: 0, ordinal: None, }) } /// Sets number of rows in this row group. pub fn set_num_rows(mut self, value: i64) -> Self { self.0.num_rows = value; self } /// Sets the sorting order for columns pub fn set_sorting_columns(mut self, value: Option<Vec<SortingColumn>>) -> Self { self.0.sorting_columns = value; self } /// Sets total size in bytes for this row group. pub fn set_total_byte_size(mut self, value: i64) -> Self { self.0.total_byte_size = value; self } /// Takes ownership of the the column metadata in this builder, and clears /// the list of columns. /// /// This can be used for more efficient creation of a new RowGroupMetaData /// from an existing one. pub fn take_columns(&mut self) -> Vec<ColumnChunkMetaData> { std::mem::take(&mut self.0.columns) } /// Sets column metadata for this row group. pub fn set_column_metadata(mut self, value: Vec<ColumnChunkMetaData>) -> Self { self.0.columns = value; self } /// Adds a column metadata to this row group pub fn add_column_metadata(mut self, value: ColumnChunkMetaData) -> Self { self.0.columns.push(value); self } /// Sets ordinal for this row group. pub fn set_ordinal(mut self, value: i16) -> Self { self.0.ordinal = Some(value); self } /// Sets file offset for this row group. pub fn set_file_offset(mut self, value: i64) -> Self { self.0.file_offset = Some(value); self } /// Builds row group metadata. pub fn build(self) -> Result<RowGroupMetaData> { if self.0.schema_descr.num_columns() != self.0.columns.len() { return Err(general_err!( "Column length mismatch: {} != {}", self.0.schema_descr.num_columns(), self.0.columns.len() )); } Ok(self.0) } } /// Metadata for a column chunk. #[derive(Debug, Clone, PartialEq)] pub struct ColumnChunkMetaData { column_descr: ColumnDescPtr, encodings: Vec<Encoding>, file_path: Option<String>, file_offset: i64, num_values: i64, compression: Compression, total_compressed_size: i64, total_uncompressed_size: i64, data_page_offset: i64, index_page_offset: Option<i64>, dictionary_page_offset: Option<i64>, statistics: Option<Statistics>, encoding_stats: Option<Vec<PageEncodingStats>>, bloom_filter_offset: Option<i64>, bloom_filter_length: Option<i32>, offset_index_offset: Option<i64>, offset_index_length: Option<i32>, column_index_offset: Option<i64>, column_index_length: Option<i32>, unencoded_byte_array_data_bytes: Option<i64>, repetition_level_histogram: Option<LevelHistogram>, definition_level_histogram: Option<LevelHistogram>, #[cfg(feature = "encryption")] column_crypto_metadata: Option<ColumnCryptoMetaData>, } /// Histograms for repetition and definition levels. /// /// Each histogram is a vector of length `max_level + 1`. The value at index `i` is the number of /// values at level `i`. /// /// For example, `vec[0]` is the number of rows with level 0, `vec[1]` is the /// number of rows with level 1, and so on. /// #[derive(Debug, Clone, PartialEq, Eq, Hash, Default)] pub struct LevelHistogram { inner: Vec<i64>, } impl LevelHistogram { /// Creates a new level histogram data. /// /// Length will be `max_level + 1`. /// /// Returns `None` when `max_level == 0` (because histograms are not necessary in this case) pub fn try_new(max_level: i16) -> Option<Self> { if max_level > 0 { Some(Self { inner: vec![0; max_level as usize + 1], }) } else { None } } /// Returns a reference to the the histogram's values. pub fn values(&self) -> &[i64] { &self.inner } /// Return the inner vector, consuming self pub fn into_inner(self) -> Vec<i64> { self.inner } /// Returns the histogram value at the given index. /// /// The value of `i` is the number of values with level `i`. For example, /// `get(1)` returns the number of values with level 1. /// /// Returns `None` if the index is out of bounds. pub fn get(&self, index: usize) -> Option<i64> { self.inner.get(index).copied() } /// Adds the values from the other histogram to this histogram /// /// # Panics /// If the histograms have different lengths pub fn add(&mut self, other: &Self) { assert_eq!(self.len(), other.len()); for (dst, src) in self.inner.iter_mut().zip(other.inner.iter()) { *dst += src; } } /// return the length of the histogram pub fn len(&self) -> usize { self.inner.len() } /// returns if the histogram is empty pub fn is_empty(&self) -> bool { self.inner.is_empty() } /// Sets the values of all histogram levels to 0. pub fn reset(&mut self) { for value in self.inner.iter_mut() { *value = 0; } } /// Updates histogram values using provided repetition levels /// /// # Panics /// if any of the levels is greater than the length of the histogram ( /// the argument supplied to [`Self::try_new`]) pub fn update_from_levels(&mut self, levels: &[i16]) { for &level in levels { self.inner[level as usize] += 1; } } } impl From<Vec<i64>> for LevelHistogram { fn from(inner: Vec<i64>) -> Self { Self { inner } } } impl From<LevelHistogram> for Vec<i64> { fn from(value: LevelHistogram) -> Self { value.into_inner() } } impl HeapSize for LevelHistogram { fn heap_size(&self) -> usize { self.inner.heap_size() } } /// Represents common operations for a column chunk. impl ColumnChunkMetaData { /// Returns builder for column chunk metadata. pub fn builder(column_descr: ColumnDescPtr) -> ColumnChunkMetaDataBuilder { ColumnChunkMetaDataBuilder::new(column_descr) } /// File where the column chunk is stored. /// /// If not set, assumed to belong to the same file as the metadata. /// This path is relative to the current file. pub fn file_path(&self) -> Option<&str> { self.file_path.as_deref() } /// Byte offset of `ColumnMetaData` in `file_path()`. /// /// Note that the meaning of this field has been inconsistent between implementations /// so its use has since been deprecated in the Parquet specification. Modern implementations /// will set this to `0` to indicate that the `ColumnMetaData` is solely contained in the /// `ColumnChunk` struct. pub fn file_offset(&self) -> i64 { self.file_offset } /// Type of this column. Must be primitive. pub fn column_type(&self) -> Type { self.column_descr.physical_type() } /// Path (or identifier) of this column. pub fn column_path(&self) -> &ColumnPath { self.column_descr.path() } /// Descriptor for this column. pub fn column_descr(&self) -> &ColumnDescriptor { self.column_descr.as_ref() } /// Reference counted clone of descriptor for this column. pub fn column_descr_ptr(&self) -> ColumnDescPtr { self.column_descr.clone() } /// All encodings used for this column. pub fn encodings(&self) -> &Vec<Encoding> { &self.encodings } /// Total number of values in this column chunk. pub fn num_values(&self) -> i64 { self.num_values } /// Compression for this column. pub fn compression(&self) -> Compression { self.compression } /// Returns the total compressed data size of this column chunk. pub fn compressed_size(&self) -> i64 { self.total_compressed_size } /// Returns the total uncompressed data size of this column chunk. pub fn uncompressed_size(&self) -> i64 { self.total_uncompressed_size } /// Returns the offset for the column data. pub fn data_page_offset(&self) -> i64 { self.data_page_offset } /// Returns the offset for the index page. pub fn index_page_offset(&self) -> Option<i64> { self.index_page_offset } /// Returns the offset for the dictionary page, if any. pub fn dictionary_page_offset(&self) -> Option<i64> { self.dictionary_page_offset } /// Returns the offset and length in bytes of the column chunk within the file pub fn byte_range(&self) -> (u64, u64) { let col_start = match self.dictionary_page_offset() { Some(dictionary_page_offset) => dictionary_page_offset, None => self.data_page_offset(), }; let col_len = self.compressed_size(); assert!( col_start >= 0 && col_len >= 0, "column start and length should not be negative" ); (col_start as u64, col_len as u64) } /// Returns statistics that are set for this column chunk, /// or `None` if no statistics are available. pub fn statistics(&self) -> Option<&Statistics> { self.statistics.as_ref() } /// Returns the offset for the page encoding stats, /// or `None` if no page encoding stats are available. pub fn page_encoding_stats(&self) -> Option<&Vec<PageEncodingStats>> { self.encoding_stats.as_ref() } /// Returns the offset for the bloom filter. pub fn bloom_filter_offset(&self) -> Option<i64> { self.bloom_filter_offset } /// Returns the offset for the bloom filter. pub fn bloom_filter_length(&self) -> Option<i32> { self.bloom_filter_length } /// Returns the offset for the column index. pub fn column_index_offset(&self) -> Option<i64> { self.column_index_offset } /// Returns the offset for the column index length. pub fn column_index_length(&self) -> Option<i32> { self.column_index_length } /// Returns the range for the offset index if any pub(crate) fn column_index_range(&self) -> Option<Range<u64>> { let offset = u64::try_from(self.column_index_offset?).ok()?; let length = u64::try_from(self.column_index_length?).ok()?; Some(offset..(offset + length)) } /// Returns the offset for the offset index. pub fn offset_index_offset(&self) -> Option<i64> { self.offset_index_offset } /// Returns the offset for the offset index length. pub fn offset_index_length(&self) -> Option<i32> { self.offset_index_length } /// Returns the range for the offset index if any pub(crate) fn offset_index_range(&self) -> Option<Range<u64>> { let offset = u64::try_from(self.offset_index_offset?).ok()?; let length = u64::try_from(self.offset_index_length?).ok()?; Some(offset..(offset + length)) } /// Returns the number of bytes of variable length data after decoding. /// /// Only set for BYTE_ARRAY columns. This field may not be set by older /// writers. pub fn unencoded_byte_array_data_bytes(&self) -> Option<i64> { self.unencoded_byte_array_data_bytes } /// Returns the repetition level histogram. /// /// The returned value `vec[i]` is how many values are at repetition level `i`. For example, /// `vec[0]` indicates how many rows the page contains. /// This field may not be set by older writers. pub fn repetition_level_histogram(&self) -> Option<&LevelHistogram> { self.repetition_level_histogram.as_ref() } /// Returns the definition level histogram. /// /// The returned value `vec[i]` is how many values are at definition level `i`. For example, /// `vec[max_definition_level]` indicates how many non-null values are present in the page. /// This field may not be set by older writers. pub fn definition_level_histogram(&self) -> Option<&LevelHistogram> { self.definition_level_histogram.as_ref() } /// Returns the encryption metadata for this column chunk. #[cfg(feature = "encryption")] pub fn crypto_metadata(&self) -> Option<&ColumnCryptoMetaData> { self.column_crypto_metadata.as_ref() } /// Method to convert from Thrift. pub fn from_thrift(column_descr: ColumnDescPtr, cc: ColumnChunk) -> Result<Self> { if cc.meta_data.is_none() { return Err(general_err!("Expected to have column metadata")); } let mut col_metadata: ColumnMetaData = cc.meta_data.unwrap(); let column_type = Type::try_from(col_metadata.type_)?; let encodings = col_metadata .encodings .drain(0..) .map(Encoding::try_from) .collect::<Result<_>>()?; let compression = Compression::try_from(col_metadata.codec)?; let file_path = cc.file_path; let file_offset = cc.file_offset; let num_values = col_metadata.num_values; let total_compressed_size = col_metadata.total_compressed_size; let total_uncompressed_size = col_metadata.total_uncompressed_size; let data_page_offset = col_metadata.data_page_offset; let index_page_offset = col_metadata.index_page_offset; let dictionary_page_offset = col_metadata.dictionary_page_offset; let statistics = statistics::from_thrift(column_type, col_metadata.statistics)?; let encoding_stats = col_metadata .encoding_stats .as_ref() .map(|vec| { vec.iter() .map(page_encoding_stats::try_from_thrift) .collect::<Result<_>>() }) .transpose()?; let bloom_filter_offset = col_metadata.bloom_filter_offset; let bloom_filter_length = col_metadata.bloom_filter_length; let offset_index_offset = cc.offset_index_offset; let offset_index_length = cc.offset_index_length; let column_index_offset = cc.column_index_offset; let column_index_length = cc.column_index_length; let ( unencoded_byte_array_data_bytes, repetition_level_histogram, definition_level_histogram, ) = if let Some(size_stats) = col_metadata.size_statistics { ( size_stats.unencoded_byte_array_data_bytes, size_stats.repetition_level_histogram, size_stats.definition_level_histogram, ) } else { (None, None, None) }; let repetition_level_histogram = repetition_level_histogram.map(LevelHistogram::from); let definition_level_histogram = definition_level_histogram.map(LevelHistogram::from); #[cfg(feature = "encryption")] let column_crypto_metadata = if let Some(crypto_metadata) = cc.crypto_metadata { Some(column_crypto_metadata::try_from_thrift(&crypto_metadata)?) } else { None }; let result = ColumnChunkMetaData { column_descr, encodings, file_path, file_offset, num_values, compression, total_compressed_size, total_uncompressed_size, data_page_offset, index_page_offset, dictionary_page_offset, statistics, encoding_stats, bloom_filter_offset, bloom_filter_length, offset_index_offset, offset_index_length, column_index_offset, column_index_length, unencoded_byte_array_data_bytes, repetition_level_histogram, definition_level_histogram, #[cfg(feature = "encryption")] column_crypto_metadata, }; Ok(result) } /// Method to convert to Thrift. pub fn to_thrift(&self) -> ColumnChunk { let column_metadata = self.to_column_metadata_thrift(); ColumnChunk { file_path: self.file_path().map(|s| s.to_owned()), file_offset: self.file_offset, meta_data: Some(column_metadata), offset_index_offset: self.offset_index_offset, offset_index_length: self.offset_index_length, column_index_offset: self.column_index_offset, column_index_length: self.column_index_length, crypto_metadata: self.column_crypto_metadata_thrift(), encrypted_column_metadata: None, } } /// Method to convert to Thrift `ColumnMetaData` pub fn to_column_metadata_thrift(&self) -> ColumnMetaData { let size_statistics = if self.unencoded_byte_array_data_bytes.is_some() || self.repetition_level_histogram.is_some() || self.definition_level_histogram.is_some() { let repetition_level_histogram = self .repetition_level_histogram .as_ref() .map(|hist| hist.clone().into_inner()); let definition_level_histogram = self .definition_level_histogram .as_ref() .map(|hist| hist.clone().into_inner()); Some(SizeStatistics { unencoded_byte_array_data_bytes: self.unencoded_byte_array_data_bytes, repetition_level_histogram, definition_level_histogram, }) } else { None }; ColumnMetaData { type_: self.column_type().into(), encodings: self.encodings().iter().map(|&v| v.into()).collect(), path_in_schema: self.column_path().as_ref().to_vec(), codec: self.compression.into(), num_values: self.num_values, total_uncompressed_size: self.total_uncompressed_size, total_compressed_size: self.total_compressed_size, key_value_metadata: None, data_page_offset: self.data_page_offset, index_page_offset: self.index_page_offset, dictionary_page_offset: self.dictionary_page_offset, statistics: statistics::to_thrift(self.statistics.as_ref()), encoding_stats: self .encoding_stats .as_ref() .map(|vec| vec.iter().map(page_encoding_stats::to_thrift).collect()), bloom_filter_offset: self.bloom_filter_offset, bloom_filter_length: self.bloom_filter_length, size_statistics, } } /// Converts this [`ColumnChunkMetaData`] into a [`ColumnChunkMetaDataBuilder`] pub fn into_builder(self) -> ColumnChunkMetaDataBuilder { ColumnChunkMetaDataBuilder::from(self) } #[cfg(feature = "encryption")] fn column_crypto_metadata_thrift(&self) -> Option<TColumnCryptoMetaData> { self.column_crypto_metadata .as_ref() .map(column_crypto_metadata::to_thrift) } #[cfg(not(feature = "encryption"))] fn column_crypto_metadata_thrift(&self) -> Option<TColumnCryptoMetaData> { None } } /// Builder for [`ColumnChunkMetaData`] /// /// This builder is used to create a new column chunk metadata or modify an /// existing one. /// /// # Example /// ```no_run /// # use parquet::file::metadata::{ColumnChunkMetaData, ColumnChunkMetaDataBuilder}; /// # fn get_column_chunk_metadata() -> ColumnChunkMetaData { unimplemented!(); } /// let column_chunk_metadata = get_column_chunk_metadata(); /// // create a new builder from existing column chunk metadata /// let builder = ColumnChunkMetaDataBuilder::from(column_chunk_metadata); /// // clear the statistics: /// let column_chunk_metadata: ColumnChunkMetaData = builder /// .clear_statistics() /// .build() /// .unwrap(); /// ``` pub struct ColumnChunkMetaDataBuilder(ColumnChunkMetaData); impl ColumnChunkMetaDataBuilder { /// Creates new column chunk metadata builder. /// /// See also [`ColumnChunkMetaData::builder`] fn new(column_descr: ColumnDescPtr) -> Self { Self(ColumnChunkMetaData { column_descr, encodings: Vec::new(), file_path: None, file_offset: 0, num_values: 0, compression: Compression::UNCOMPRESSED, total_compressed_size: 0, total_uncompressed_size: 0, data_page_offset: 0, index_page_offset: None, dictionary_page_offset: None, statistics: None, encoding_stats: None, bloom_filter_offset: None, bloom_filter_length: None, offset_index_offset: None, offset_index_length: None, column_index_offset: None, column_index_length: None, unencoded_byte_array_data_bytes: None, repetition_level_histogram: None, definition_level_histogram: None, #[cfg(feature = "encryption")] column_crypto_metadata: None, }) } /// Sets list of encodings for this column chunk. pub fn set_encodings(mut self, encodings: Vec<Encoding>) -> Self { self.0.encodings = encodings; self } /// Sets optional file path for this column chunk. pub fn set_file_path(mut self, value: String) -> Self { self.0.file_path = Some(value); self } /// Sets file offset in bytes. /// /// This field was meant to provide an alternate to storing `ColumnMetadata` directly in /// the `ColumnChunkMetadata`. However, most Parquet readers assume the `ColumnMetadata` /// is stored inline and ignore this field. #[deprecated( since = "53.0.0", note = "The Parquet specification requires this field to be 0" )] pub fn set_file_offset(mut self, value: i64) -> Self { self.0.file_offset = value; self } /// Sets number of values. pub fn set_num_values(mut self, value: i64) -> Self { self.0.num_values = value; self } /// Sets compression. pub fn set_compression(mut self, value: Compression) -> Self { self.0.compression = value; self } /// Sets total compressed size in bytes. pub fn set_total_compressed_size(mut self, value: i64) -> Self { self.0.total_compressed_size = value; self } /// Sets total uncompressed size in bytes. pub fn set_total_uncompressed_size(mut self, value: i64) -> Self { self.0.total_uncompressed_size = value; self } /// Sets data page offset in bytes. pub fn set_data_page_offset(mut self, value: i64) -> Self { self.0.data_page_offset = value; self } /// Sets optional dictionary page offset in bytes. pub fn set_dictionary_page_offset(mut self, value: Option<i64>) -> Self { self.0.dictionary_page_offset = value; self } /// Sets optional index page offset in bytes. pub fn set_index_page_offset(mut self, value: Option<i64>) -> Self { self.0.index_page_offset = value; self } /// Sets statistics for this column chunk. pub fn set_statistics(mut self, value: Statistics) -> Self { self.0.statistics = Some(value); self } /// Clears the statistics for this column chunk. pub fn clear_statistics(mut self) -> Self { self.0.statistics = None; self } /// Sets page encoding stats for this column chunk. pub fn set_page_encoding_stats(mut self, value: Vec<PageEncodingStats>) -> Self { self.0.encoding_stats = Some(value); self } /// Clears the page encoding stats for this column chunk. pub fn clear_page_encoding_stats(mut self) -> Self { self.0.encoding_stats = None; self } /// Sets optional bloom filter offset in bytes. pub fn set_bloom_filter_offset(mut self, value: Option<i64>) -> Self { self.0.bloom_filter_offset = value; self } /// Sets optional bloom filter length in bytes. pub fn set_bloom_filter_length(mut self, value: Option<i32>) -> Self { self.0.bloom_filter_length = value; self } /// Sets optional offset index offset in bytes. pub fn set_offset_index_offset(mut self, value: Option<i64>) -> Self { self.0.offset_index_offset = value; self } /// Sets optional offset index length in bytes. pub fn set_offset_index_length(mut self, value: Option<i32>) -> Self { self.0.offset_index_length = value; self } /// Sets optional column index offset in bytes. pub fn set_column_index_offset(mut self, value: Option<i64>) -> Self { self.0.column_index_offset = value; self } /// Sets optional column index length in bytes. pub fn set_column_index_length(mut self, value: Option<i32>) -> Self { self.0.column_index_length = value; self } /// Sets optional length of variable length data in bytes. pub fn set_unencoded_byte_array_data_bytes(mut self, value: Option<i64>) -> Self { self.0.unencoded_byte_array_data_bytes = value; self } /// Sets optional repetition level histogram pub fn set_repetition_level_histogram(mut self, value: Option<LevelHistogram>) -> Self { self.0.repetition_level_histogram = value; self } /// Sets optional repetition level histogram pub fn set_definition_level_histogram(mut self, value: Option<LevelHistogram>) -> Self { self.0.definition_level_histogram = value; self } #[cfg(feature = "encryption")] /// Set the encryption metadata for an encrypted column pub fn set_column_crypto_metadata(mut self, value: Option<ColumnCryptoMetaData>) -> Self { self.0.column_crypto_metadata = value; self } /// Builds column chunk metadata. pub fn build(self) -> Result<ColumnChunkMetaData> { Ok(self.0) } } /// Builder for Parquet [`ColumnIndex`], part of the Parquet [PageIndex] /// /// [PageIndex]: https://github.com/apache/parquet-format/blob/master/PageIndex.md pub struct ColumnIndexBuilder { null_pages: Vec<bool>, min_values: Vec<Vec<u8>>, max_values: Vec<Vec<u8>>, null_counts: Vec<i64>, boundary_order: BoundaryOrder, /// contains the concatenation of the histograms of all pages repetition_level_histograms: Option<Vec<i64>>, /// contains the concatenation of the histograms of all pages definition_level_histograms: Option<Vec<i64>>, /// Is the information in the builder valid? /// /// Set to `false` if any entry in the page doesn't have statistics for /// some reason, so statistics for that page won't be written to the file. /// This might happen if the page is entirely null, or /// is a floating point column without any non-nan values /// e.g. <https://github.com/apache/parquet-format/pull/196> valid: bool, } impl Default for ColumnIndexBuilder { fn default() -> Self { Self::new() } } impl ColumnIndexBuilder { /// Creates a new column index builder. pub fn new() -> Self { ColumnIndexBuilder { null_pages: Vec::new(), min_values: Vec::new(), max_values: Vec::new(), null_counts: Vec::new(), boundary_order: BoundaryOrder::UNORDERED, repetition_level_histograms: None, definition_level_histograms: None, valid: true, } } /// Append statistics for the next page pub fn append( &mut self, null_page: bool, min_value: Vec<u8>, max_value: Vec<u8>, null_count: i64, ) { self.null_pages.push(null_page); self.min_values.push(min_value); self.max_values.push(max_value); self.null_counts.push(null_count); } /// Append the given page-level histograms to the [`ColumnIndex`] histograms. /// Does nothing if the `ColumnIndexBuilder` is not in the `valid` state. pub fn append_histograms( &mut self, repetition_level_histogram: &Option<LevelHistogram>, definition_level_histogram: &Option<LevelHistogram>, ) { if !self.valid { return; } if let Some(ref rep_lvl_hist) = repetition_level_histogram { let hist = self.repetition_level_histograms.get_or_insert(Vec::new()); hist.reserve(rep_lvl_hist.len()); hist.extend(rep_lvl_hist.values()); } if let Some(ref def_lvl_hist) = definition_level_histogram { let hist = self.definition_level_histograms.get_or_insert(Vec::new()); hist.reserve(def_lvl_hist.len()); hist.extend(def_lvl_hist.values()); } } /// Set the boundary order of the column index pub fn set_boundary_order(&mut self, boundary_order: BoundaryOrder) { self.boundary_order = boundary_order; } /// Mark this column index as invalid pub fn to_invalid(&mut self) { self.valid = false; } /// Is the information in the builder valid? pub fn valid(&self) -> bool { self.valid } /// Build and get the thrift metadata of column index /// /// Note: callers should check [`Self::valid`] before calling this method pub fn build_to_thrift(self) -> ColumnIndex { ColumnIndex::new( self.null_pages, self.min_values, self.max_values, self.boundary_order, self.null_counts, self.repetition_level_histograms, self.definition_level_histograms, ) } } impl From<ColumnChunkMetaData> for ColumnChunkMetaDataBuilder { fn from(value: ColumnChunkMetaData) -> Self { ColumnChunkMetaDataBuilder(value) } } /// Builder for offset index, part of the Parquet [PageIndex]. /// /// [PageIndex]: https://github.com/apache/parquet-format/blob/master/PageIndex.md pub struct OffsetIndexBuilder { offset_array: Vec<i64>, compressed_page_size_array: Vec<i32>, first_row_index_array: Vec<i64>, unencoded_byte_array_data_bytes_array: Option<Vec<i64>>, current_first_row_index: i64, } impl Default for OffsetIndexBuilder { fn default() -> Self { Self::new() } } impl OffsetIndexBuilder { /// Creates a new offset index builder. pub fn new() -> Self { OffsetIndexBuilder { offset_array: Vec::new(), compressed_page_size_array: Vec::new(), first_row_index_array: Vec::new(), unencoded_byte_array_data_bytes_array: None, current_first_row_index: 0, } } /// Append the row count of the next page. pub fn append_row_count(&mut self, row_count: i64) { let current_page_row_index = self.current_first_row_index; self.first_row_index_array.push(current_page_row_index); self.current_first_row_index += row_count; } /// Append the offset and size of the next page. pub fn append_offset_and_size(&mut self, offset: i64, compressed_page_size: i32) { self.offset_array.push(offset); self.compressed_page_size_array.push(compressed_page_size); } /// Append the unencoded byte array data bytes of the next page. pub fn append_unencoded_byte_array_data_bytes( &mut self, unencoded_byte_array_data_bytes: Option<i64>, ) { if let Some(val) = unencoded_byte_array_data_bytes { self.unencoded_byte_array_data_bytes_array .get_or_insert(Vec::new()) .push(val); } } /// Build and get the thrift metadata of offset index pub fn build_to_thrift(self) -> OffsetIndex { let locations = self .offset_array .iter() .zip(self.compressed_page_size_array.iter()) .zip(self.first_row_index_array.iter()) .map(|((offset, size), row_index)| PageLocation::new(*offset, *size, *row_index)) .collect::<Vec<_>>(); OffsetIndex::new(locations, self.unencoded_byte_array_data_bytes_array) } } #[cfg(test)] mod tests { use super::*; use crate::basic::{PageType, SortOrder}; use crate::file::page_index::index::NativeIndex; #[test] fn test_row_group_metadata_thrift_conversion() { let schema_descr = get_test_schema_descr(); let mut columns = vec![]; for ptr in schema_descr.columns() { let column = ColumnChunkMetaData::builder(ptr.clone()).build().unwrap(); columns.push(column); } let row_group_meta = RowGroupMetaData::builder(schema_descr.clone()) .set_num_rows(1000) .set_total_byte_size(2000) .set_column_metadata(columns) .set_ordinal(1) .build() .unwrap(); let row_group_exp = row_group_meta.to_thrift(); let row_group_res = RowGroupMetaData::from_thrift(schema_descr, row_group_exp.clone()) .unwrap() .to_thrift(); assert_eq!(row_group_res, row_group_exp); } #[test] fn test_row_group_metadata_thrift_conversion_empty() { let schema_descr = get_test_schema_descr(); let row_group_meta = RowGroupMetaData::builder(schema_descr).build(); assert!(row_group_meta.is_err()); if let Err(e) = row_group_meta { assert_eq!( format!("{e}"), "Parquet error: Column length mismatch: 2 != 0" ); } } /// Test reading a corrupted Parquet file with 3 columns in its schema but only 2 in its row group #[test] fn test_row_group_metadata_thrift_corrupted() { let schema_descr_2cols = Arc::new(SchemaDescriptor::new(Arc::new( SchemaType::group_type_builder("schema") .with_fields(vec![ Arc::new( SchemaType::primitive_type_builder("a", Type::INT32) .build() .unwrap(), ), Arc::new( SchemaType::primitive_type_builder("b", Type::INT32) .build() .unwrap(), ), ]) .build() .unwrap(), ))); let schema_descr_3cols = Arc::new(SchemaDescriptor::new(Arc::new( SchemaType::group_type_builder("schema") .with_fields(vec![ Arc::new( SchemaType::primitive_type_builder("a", Type::INT32) .build() .unwrap(), ), Arc::new( SchemaType::primitive_type_builder("b", Type::INT32) .build() .unwrap(), ), Arc::new( SchemaType::primitive_type_builder("c", Type::INT32) .build() .unwrap(), ), ]) .build() .unwrap(), ))); let row_group_meta_2cols = RowGroupMetaData::builder(schema_descr_2cols.clone()) .set_num_rows(1000) .set_total_byte_size(2000) .set_column_metadata(vec![ ColumnChunkMetaData::builder(schema_descr_2cols.column(0)) .build() .unwrap(), ColumnChunkMetaData::builder(schema_descr_2cols.column(1)) .build() .unwrap(), ]) .set_ordinal(1) .build() .unwrap(); let err = RowGroupMetaData::from_thrift(schema_descr_3cols, row_group_meta_2cols.to_thrift()) .unwrap_err() .to_string(); assert_eq!( err, "Parquet error: Column count mismatch. Schema has 3 columns while Row Group has 2" ); } #[test] fn test_column_chunk_metadata_thrift_conversion() { let column_descr = get_test_schema_descr().column(0); let col_metadata = ColumnChunkMetaData::builder(column_descr.clone()) .set_encodings(vec![Encoding::PLAIN, Encoding::RLE]) .set_file_path("file_path".to_owned()) .set_num_values(1000) .set_compression(Compression::SNAPPY) .set_total_compressed_size(2000) .set_total_uncompressed_size(3000) .set_data_page_offset(4000) .set_dictionary_page_offset(Some(5000)) .set_page_encoding_stats(vec![ PageEncodingStats { page_type: PageType::DATA_PAGE, encoding: Encoding::PLAIN, count: 3, }, PageEncodingStats { page_type: PageType::DATA_PAGE, encoding: Encoding::RLE, count: 5, }, ]) .set_bloom_filter_offset(Some(6000)) .set_bloom_filter_length(Some(25)) .set_offset_index_offset(Some(7000)) .set_offset_index_length(Some(25)) .set_column_index_offset(Some(8000)) .set_column_index_length(Some(25)) .set_unencoded_byte_array_data_bytes(Some(2000)) .set_repetition_level_histogram(Some(LevelHistogram::from(vec![100, 100]))) .set_definition_level_histogram(Some(LevelHistogram::from(vec![0, 200]))) .build() .unwrap(); let col_chunk_res = ColumnChunkMetaData::from_thrift(column_descr, col_metadata.to_thrift()).unwrap(); assert_eq!(col_chunk_res, col_metadata); } #[test] fn test_column_chunk_metadata_thrift_conversion_empty() { let column_descr = get_test_schema_descr().column(0); let col_metadata = ColumnChunkMetaData::builder(column_descr.clone()) .build() .unwrap(); let col_chunk_exp = col_metadata.to_thrift(); let col_chunk_res = ColumnChunkMetaData::from_thrift(column_descr, col_chunk_exp.clone()) .unwrap() .to_thrift(); assert_eq!(col_chunk_res, col_chunk_exp); } #[test] fn test_compressed_size() { let schema_descr = get_test_schema_descr(); let mut columns = vec![]; for column_descr in schema_descr.columns() { let column = ColumnChunkMetaData::builder(column_descr.clone()) .set_total_compressed_size(500) .set_total_uncompressed_size(700) .build() .unwrap(); columns.push(column); } let row_group_meta = RowGroupMetaData::builder(schema_descr) .set_num_rows(1000) .set_column_metadata(columns) .build() .unwrap(); let compressed_size_res: i64 = row_group_meta.compressed_size(); let compressed_size_exp: i64 = 1000; assert_eq!(compressed_size_res, compressed_size_exp); } #[test] fn test_memory_size() { let schema_descr = get_test_schema_descr(); let columns = schema_descr .columns() .iter() .map(|column_descr| { ColumnChunkMetaData::builder(column_descr.clone()) .set_statistics(Statistics::new::<i32>(None, None, None, None, false)) .build() }) .collect::<Result<Vec<_>>>() .unwrap(); let row_group_meta = RowGroupMetaData::builder(schema_descr.clone()) .set_num_rows(1000) .set_column_metadata(columns) .build() .unwrap(); let row_group_meta = vec![row_group_meta]; let version = 2; let num_rows = 1000; let created_by = Some(String::from("test harness")); let key_value_metadata = Some(vec![KeyValue::new( String::from("Foo"), Some(String::from("bar")), )]); let column_orders = Some(vec![ ColumnOrder::UNDEFINED, ColumnOrder::TYPE_DEFINED_ORDER(SortOrder::UNSIGNED), ]); let file_metadata = FileMetaData::new( version, num_rows, created_by, key_value_metadata, schema_descr.clone(), column_orders, ); // Now, add in Exact Statistics let columns_with_stats = schema_descr .columns() .iter() .map(|column_descr| { ColumnChunkMetaData::builder(column_descr.clone()) .set_statistics(Statistics::new::<i32>( Some(0), Some(100), None, None, false, )) .build() }) .collect::<Result<Vec<_>>>() .unwrap(); let row_group_meta_with_stats = RowGroupMetaData::builder(schema_descr) .set_num_rows(1000) .set_column_metadata(columns_with_stats) .build() .unwrap(); let row_group_meta_with_stats = vec![row_group_meta_with_stats]; let parquet_meta = ParquetMetaDataBuilder::new(file_metadata.clone()) .set_row_groups(row_group_meta_with_stats) .build(); #[cfg(not(feature = "encryption"))] let base_expected_size = 2312; #[cfg(feature = "encryption")] let base_expected_size = 2640; assert_eq!(parquet_meta.memory_size(), base_expected_size); let mut column_index = ColumnIndexBuilder::new(); column_index.append(false, vec![1u8], vec![2u8, 3u8], 4); let column_index = column_index.build_to_thrift(); let native_index = NativeIndex::<bool>::try_new(column_index).unwrap(); // Now, add in OffsetIndex let mut offset_index = OffsetIndexBuilder::new(); offset_index.append_row_count(1); offset_index.append_offset_and_size(2, 3); offset_index.append_unencoded_byte_array_data_bytes(Some(10)); offset_index.append_row_count(1); offset_index.append_offset_and_size(2, 3); offset_index.append_unencoded_byte_array_data_bytes(Some(10)); let offset_index = offset_index.build_to_thrift(); let parquet_meta = ParquetMetaDataBuilder::new(file_metadata) .set_row_groups(row_group_meta) .set_column_index(Some(vec![vec![Index::BOOLEAN(native_index)]])) .set_offset_index(Some(vec![vec![ OffsetIndexMetaData::try_new(offset_index).unwrap() ]])) .build(); #[cfg(not(feature = "encryption"))] let bigger_expected_size = 2816; #[cfg(feature = "encryption")] let bigger_expected_size = 3144; // more set fields means more memory usage assert!(bigger_expected_size > base_expected_size); assert_eq!(parquet_meta.memory_size(), bigger_expected_size); } /// Returns sample schema descriptor so we can create column metadata. fn get_test_schema_descr() -> SchemaDescPtr { let schema = SchemaType::group_type_builder("schema") .with_fields(vec![ Arc::new( SchemaType::primitive_type_builder("a", Type::INT32) .build() .unwrap(), ), Arc::new( SchemaType::primitive_type_builder("b", Type::INT32) .build() .unwrap(), ), ]) .build() .unwrap(); Arc::new(SchemaDescriptor::new(Arc::new(schema))) } }