parquet/src/file/metadata/mod.rs (1,352 lines of code) (raw):
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
//! Parquet metadata API
//!
//! Most users should use these structures to interact with Parquet metadata.
//! The [crate::format] module contains lower level structures generated from the
//! Parquet thrift definition.
//!
//! * [`ParquetMetaData`]: Top level metadata container, read from the Parquet
//! file footer.
//!
//! * [`FileMetaData`]: File level metadata such as schema, row counts and
//! version.
//!
//! * [`RowGroupMetaData`]: Metadata for each Row Group with a File, such as
//! location and number of rows, and column chunks.
//!
//! * [`ColumnChunkMetaData`]: Metadata for each column chunk (primitive leaf)
//! within a Row Group including encoding and compression information,
//! number of values, statistics, etc.
//!
//! # APIs for working with Parquet Metadata
//!
//! The Parquet readers and writers in this crate handle reading and writing
//! metadata into parquet files. To work with metadata directly,
//! the following APIs are available:
//!
//! * [`ParquetMetaDataReader`] for reading
//! * [`ParquetMetaDataWriter`] for writing.
//!
//! [`ParquetMetaDataReader`]: https://docs.rs/parquet/latest/parquet/file/metadata/struct.ParquetMetaDataReader.html
//! [`ParquetMetaDataWriter`]: https://docs.rs/parquet/latest/parquet/file/metadata/struct.ParquetMetaDataWriter.html
//!
//! # Examples
//!
//! Please see [`external_metadata.rs`]
//!
//! [`external_metadata.rs`]: https://github.com/apache/arrow-rs/tree/master/parquet/examples/external_metadata.rs
//!
//! # Metadata Encodings and Structures
//!
//! There are three different encodings of Parquet Metadata in this crate:
//!
//! 1. `bytes`:encoded with the Thrift `TCompactProtocol` as defined in
//! [parquet.thrift]
//!
//! 2. [`format`]: Rust structures automatically generated by the thrift compiler
//! from [parquet.thrift]. These structures are low level and mirror
//! the thrift definitions.
//!
//! 3. [`file::metadata`] (this module): Easier to use Rust structures
//! with a more idiomatic API. Note that, confusingly, some but not all
//! of these structures have the same name as the [`format`] structures.
//!
//! [`format`]: crate::format
//! [`file::metadata`]: crate::file::metadata
//! [parquet.thrift]: https://github.com/apache/parquet-format/blob/master/src/main/thrift/parquet.thrift
//!
//! Graphically, this is how the different structures relate to each other:
//!
//! ```text
//! ┌ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┌ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─
//! ┌──────────────┐ │ ┌───────────────────────┐ │
//! │ │ ColumnIndex │ ││ ParquetMetaData │
//! └──────────────┘ │ └───────────────────────┘ │
//! ┌──────────────┐ │ ┌────────────────┐ │┌───────────────────────┐
//! │ ..0x24.. │ ◀────▶ │ OffsetIndex │ │ ◀────▶ │ ParquetMetaData │ │
//! └──────────────┘ │ └────────────────┘ │└───────────────────────┘
//! ... │ ... │
//! │ ┌──────────────────┐ │ ┌──────────────────┐
//! bytes │ FileMetaData* │ │ │ FileMetaData* │ │
//! (thrift encoded) │ └──────────────────┘ │ └──────────────────┘
//! ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┘ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┘
//!
//! format::meta structures file::metadata structures
//!
//! * Same name, different struct
//! ```
mod memory;
pub(crate) mod reader;
mod writer;
use crate::basic::{ColumnOrder, Compression, Encoding, Type};
#[cfg(feature = "encryption")]
use crate::encryption::{
decrypt::FileDecryptor,
modules::{create_module_aad, ModuleType},
};
use crate::errors::{ParquetError, Result};
#[cfg(feature = "encryption")]
use crate::file::column_crypto_metadata::{self, ColumnCryptoMetaData};
pub(crate) use crate::file::metadata::memory::HeapSize;
use crate::file::page_encoding_stats::{self, PageEncodingStats};
use crate::file::page_index::index::Index;
use crate::file::page_index::offset_index::OffsetIndexMetaData;
use crate::file::statistics::{self, Statistics};
use crate::format::ColumnCryptoMetaData as TColumnCryptoMetaData;
use crate::format::{
BoundaryOrder, ColumnChunk, ColumnIndex, ColumnMetaData, OffsetIndex, PageLocation, RowGroup,
SizeStatistics, SortingColumn,
};
use crate::schema::types::{
ColumnDescPtr, ColumnDescriptor, ColumnPath, SchemaDescPtr, SchemaDescriptor,
Type as SchemaType,
};
#[cfg(feature = "encryption")]
use crate::thrift::{TCompactSliceInputProtocol, TSerializable};
pub use reader::{FooterTail, ParquetMetaDataReader};
use std::ops::Range;
use std::sync::Arc;
pub use writer::ParquetMetaDataWriter;
pub(crate) use writer::ThriftMetadataWriter;
/// Page level statistics for each column chunk of each row group.
///
/// This structure is an in-memory representation of multiple [`ColumnIndex`]
/// structures in a parquet file footer, as described in the Parquet [PageIndex
/// documentation]. Each [`Index`] holds statistics about all the pages in a
/// particular column chunk.
///
/// `column_index[row_group_number][column_number]` holds the
/// [`Index`] corresponding to column `column_number` of row group
/// `row_group_number`.
///
/// For example `column_index[2][3]` holds the [`Index`] for the fourth
/// column in the third row group of the parquet file.
///
/// [PageIndex documentation]: https://github.com/apache/parquet-format/blob/master/PageIndex.md
pub type ParquetColumnIndex = Vec<Vec<Index>>;
/// [`OffsetIndexMetaData`] for each data page of each row group of each column
///
/// This structure is the parsed representation of the [`OffsetIndex`] from the
/// Parquet file footer, as described in the Parquet [PageIndex documentation].
///
/// `offset_index[row_group_number][column_number]` holds
/// the [`OffsetIndexMetaData`] corresponding to column
/// `column_number`of row group `row_group_number`.
///
/// [PageIndex documentation]: https://github.com/apache/parquet-format/blob/master/PageIndex.md
pub type ParquetOffsetIndex = Vec<Vec<OffsetIndexMetaData>>;
/// Parsed metadata for a single Parquet file
///
/// This structure is stored in the footer of Parquet files, in the format
/// defined by [`parquet.thrift`].
///
/// # Overview
/// The fields of this structure are:
/// * [`FileMetaData`]: Information about the overall file (such as the schema) (See [`Self::file_metadata`])
/// * [`RowGroupMetaData`]: Information about each Row Group (see [`Self::row_groups`])
/// * [`ParquetColumnIndex`] and [`ParquetOffsetIndex`]: Optional "Page Index" structures (see [`Self::column_index`] and [`Self::offset_index`])
///
/// This structure is read by the various readers in this crate or can be read
/// directly from a file using the [`ParquetMetaDataReader`] struct.
///
/// See the [`ParquetMetaDataBuilder`] to create and modify this structure.
///
/// [`parquet.thrift`]: https://github.com/apache/parquet-format/blob/master/src/main/thrift/parquet.thrift
#[derive(Debug, Clone, PartialEq)]
pub struct ParquetMetaData {
/// File level metadata
file_metadata: FileMetaData,
/// Row group metadata
row_groups: Vec<RowGroupMetaData>,
/// Page level index for each page in each column chunk
column_index: Option<ParquetColumnIndex>,
/// Offset index for each page in each column chunk
offset_index: Option<ParquetOffsetIndex>,
/// Optional file decryptor
#[cfg(feature = "encryption")]
file_decryptor: Option<FileDecryptor>,
}
impl ParquetMetaData {
/// Creates Parquet metadata from file metadata and a list of row
/// group metadata
pub fn new(file_metadata: FileMetaData, row_groups: Vec<RowGroupMetaData>) -> Self {
ParquetMetaData {
file_metadata,
row_groups,
#[cfg(feature = "encryption")]
file_decryptor: None,
column_index: None,
offset_index: None,
}
}
/// Adds [`FileDecryptor`] to this metadata instance to enable decryption of
/// encrypted data.
#[cfg(feature = "encryption")]
pub(crate) fn with_file_decryptor(&mut self, file_decryptor: Option<FileDecryptor>) {
self.file_decryptor = file_decryptor;
}
/// Creates Parquet metadata from file metadata, a list of row
/// group metadata, and the column index structures.
#[deprecated(since = "53.1.0", note = "Use ParquetMetaDataBuilder")]
pub fn new_with_page_index(
file_metadata: FileMetaData,
row_groups: Vec<RowGroupMetaData>,
column_index: Option<ParquetColumnIndex>,
offset_index: Option<ParquetOffsetIndex>,
) -> Self {
ParquetMetaDataBuilder::new(file_metadata)
.set_row_groups(row_groups)
.set_column_index(column_index)
.set_offset_index(offset_index)
.build()
}
/// Convert this ParquetMetaData into a [`ParquetMetaDataBuilder`]
pub fn into_builder(self) -> ParquetMetaDataBuilder {
self.into()
}
/// Returns file metadata as reference.
pub fn file_metadata(&self) -> &FileMetaData {
&self.file_metadata
}
/// Returns file decryptor as reference.
#[cfg(feature = "encryption")]
pub(crate) fn file_decryptor(&self) -> Option<&FileDecryptor> {
self.file_decryptor.as_ref()
}
/// Returns number of row groups in this file.
pub fn num_row_groups(&self) -> usize {
self.row_groups.len()
}
/// Returns row group metadata for `i`th position.
/// Position should be less than number of row groups `num_row_groups`.
pub fn row_group(&self, i: usize) -> &RowGroupMetaData {
&self.row_groups[i]
}
/// Returns slice of row groups in this file.
pub fn row_groups(&self) -> &[RowGroupMetaData] {
&self.row_groups
}
/// Returns the column index for this file if loaded
///
/// Returns `None` if the parquet file does not have a `ColumnIndex` or
/// [ArrowReaderOptions::with_page_index] was set to false.
///
/// [ArrowReaderOptions::with_page_index]: https://docs.rs/parquet/latest/parquet/arrow/arrow_reader/struct.ArrowReaderOptions.html#method.with_page_index
pub fn column_index(&self) -> Option<&ParquetColumnIndex> {
self.column_index.as_ref()
}
/// Returns offset indexes in this file, if loaded
///
/// Returns `None` if the parquet file does not have a `OffsetIndex` or
/// [ArrowReaderOptions::with_page_index] was set to false.
///
/// [ArrowReaderOptions::with_page_index]: https://docs.rs/parquet/latest/parquet/arrow/arrow_reader/struct.ArrowReaderOptions.html#method.with_page_index
pub fn offset_index(&self) -> Option<&ParquetOffsetIndex> {
self.offset_index.as_ref()
}
/// Estimate of the bytes allocated to store `ParquetMetadata`
///
/// # Notes:
///
/// 1. Includes size of self
///
/// 2. Includes heap memory for sub fields such as [`FileMetaData`] and
/// [`RowGroupMetaData`].
///
/// 3. Includes memory from shared pointers (e.g. [`SchemaDescPtr`]). This
/// means `memory_size` will over estimate the memory size if such pointers
/// are shared.
///
/// 4. Does not include any allocator overheads
pub fn memory_size(&self) -> usize {
std::mem::size_of::<Self>()
+ self.file_metadata.heap_size()
+ self.row_groups.heap_size()
+ self.column_index.heap_size()
+ self.offset_index.heap_size()
}
/// Override the column index
pub(crate) fn set_column_index(&mut self, index: Option<ParquetColumnIndex>) {
self.column_index = index;
}
/// Override the offset index
pub(crate) fn set_offset_index(&mut self, index: Option<ParquetOffsetIndex>) {
self.offset_index = index;
}
}
/// A builder for creating / manipulating [`ParquetMetaData`]
///
/// # Example creating a new [`ParquetMetaData`]
///
///```no_run
/// # use parquet::file::metadata::{FileMetaData, ParquetMetaData, ParquetMetaDataBuilder, RowGroupMetaData, RowGroupMetaDataBuilder};
/// # fn get_file_metadata() -> FileMetaData { unimplemented!(); }
/// // Create a new builder given the file metadata
/// let file_metadata = get_file_metadata();
/// // Create a row group
/// let row_group = RowGroupMetaData::builder(file_metadata.schema_descr_ptr())
/// .set_num_rows(100)
/// // ... (A real row group needs more than just the number of rows)
/// .build()
/// .unwrap();
/// // Create the final metadata
/// let metadata: ParquetMetaData = ParquetMetaDataBuilder::new(file_metadata)
/// .add_row_group(row_group)
/// .build();
/// ```
///
/// # Example modifying an existing [`ParquetMetaData`]
/// ```no_run
/// # use parquet::file::metadata::ParquetMetaData;
/// # fn load_metadata() -> ParquetMetaData { unimplemented!(); }
/// // Modify the metadata so only the last RowGroup remains
/// let metadata: ParquetMetaData = load_metadata();
/// let mut builder = metadata.into_builder();
///
/// // Take existing row groups to modify
/// let mut row_groups = builder.take_row_groups();
/// let last_row_group = row_groups.pop().unwrap();
///
/// let metadata = builder
/// .add_row_group(last_row_group)
/// .build();
/// ```
pub struct ParquetMetaDataBuilder(ParquetMetaData);
impl ParquetMetaDataBuilder {
/// Create a new builder from a file metadata, with no row groups
pub fn new(file_meta_data: FileMetaData) -> Self {
Self(ParquetMetaData::new(file_meta_data, vec![]))
}
/// Create a new builder from an existing ParquetMetaData
pub fn new_from_metadata(metadata: ParquetMetaData) -> Self {
Self(metadata)
}
/// Adds a row group to the metadata
pub fn add_row_group(mut self, row_group: RowGroupMetaData) -> Self {
self.0.row_groups.push(row_group);
self
}
/// Sets all the row groups to the specified list
pub fn set_row_groups(mut self, row_groups: Vec<RowGroupMetaData>) -> Self {
self.0.row_groups = row_groups;
self
}
/// Takes ownership of the row groups in this builder, and clears the list
/// of row groups.
///
/// This can be used for more efficient creation of a new ParquetMetaData
/// from an existing one.
pub fn take_row_groups(&mut self) -> Vec<RowGroupMetaData> {
std::mem::take(&mut self.0.row_groups)
}
/// Return a reference to the current row groups
pub fn row_groups(&self) -> &[RowGroupMetaData] {
&self.0.row_groups
}
/// Sets the column index
pub fn set_column_index(mut self, column_index: Option<ParquetColumnIndex>) -> Self {
self.0.column_index = column_index;
self
}
/// Returns the current column index from the builder, replacing it with `None`
pub fn take_column_index(&mut self) -> Option<ParquetColumnIndex> {
std::mem::take(&mut self.0.column_index)
}
/// Return a reference to the current column index, if any
pub fn column_index(&self) -> Option<&ParquetColumnIndex> {
self.0.column_index.as_ref()
}
/// Sets the offset index
pub fn set_offset_index(mut self, offset_index: Option<ParquetOffsetIndex>) -> Self {
self.0.offset_index = offset_index;
self
}
/// Returns the current offset index from the builder, replacing it with `None`
pub fn take_offset_index(&mut self) -> Option<ParquetOffsetIndex> {
std::mem::take(&mut self.0.offset_index)
}
/// Return a reference to the current offset index, if any
pub fn offset_index(&self) -> Option<&ParquetOffsetIndex> {
self.0.offset_index.as_ref()
}
/// Creates a new ParquetMetaData from the builder
pub fn build(self) -> ParquetMetaData {
let Self(metadata) = self;
metadata
}
}
impl From<ParquetMetaData> for ParquetMetaDataBuilder {
fn from(meta_data: ParquetMetaData) -> Self {
Self(meta_data)
}
}
/// A key-value pair for [`FileMetaData`].
pub type KeyValue = crate::format::KeyValue;
/// Reference counted pointer for [`FileMetaData`].
pub type FileMetaDataPtr = Arc<FileMetaData>;
/// File level metadata for a Parquet file.
///
/// Includes the version of the file, metadata, number of rows, schema, and column orders
#[derive(Debug, Clone, PartialEq)]
pub struct FileMetaData {
version: i32,
num_rows: i64,
created_by: Option<String>,
key_value_metadata: Option<Vec<KeyValue>>,
schema_descr: SchemaDescPtr,
column_orders: Option<Vec<ColumnOrder>>,
}
impl FileMetaData {
/// Creates new file metadata.
pub fn new(
version: i32,
num_rows: i64,
created_by: Option<String>,
key_value_metadata: Option<Vec<KeyValue>>,
schema_descr: SchemaDescPtr,
column_orders: Option<Vec<ColumnOrder>>,
) -> Self {
FileMetaData {
version,
num_rows,
created_by,
key_value_metadata,
schema_descr,
column_orders,
}
}
/// Returns version of this file.
pub fn version(&self) -> i32 {
self.version
}
/// Returns number of rows in the file.
pub fn num_rows(&self) -> i64 {
self.num_rows
}
/// String message for application that wrote this file.
///
/// This should have the following format:
/// `<application> version <application version> (build <application build hash>)`.
///
/// ```shell
/// parquet-mr version 1.8.0 (build 0fda28af84b9746396014ad6a415b90592a98b3b)
/// ```
pub fn created_by(&self) -> Option<&str> {
self.created_by.as_deref()
}
/// Returns key_value_metadata of this file.
pub fn key_value_metadata(&self) -> Option<&Vec<KeyValue>> {
self.key_value_metadata.as_ref()
}
/// Returns Parquet [`Type`] that describes schema in this file.
///
/// [`Type`]: crate::schema::types::Type
pub fn schema(&self) -> &SchemaType {
self.schema_descr.root_schema()
}
/// Returns a reference to schema descriptor.
pub fn schema_descr(&self) -> &SchemaDescriptor {
&self.schema_descr
}
/// Returns reference counted clone for schema descriptor.
pub fn schema_descr_ptr(&self) -> SchemaDescPtr {
self.schema_descr.clone()
}
/// Column (sort) order used for `min` and `max` values of each column in this file.
///
/// Each column order corresponds to one column, determined by its position in the
/// list, matching the position of the column in the schema.
///
/// When `None` is returned, there are no column orders available, and each column
/// should be assumed to have undefined (legacy) column order.
pub fn column_orders(&self) -> Option<&Vec<ColumnOrder>> {
self.column_orders.as_ref()
}
/// Returns column order for `i`th column in this file.
/// If column orders are not available, returns undefined (legacy) column order.
pub fn column_order(&self, i: usize) -> ColumnOrder {
self.column_orders
.as_ref()
.map(|data| data[i])
.unwrap_or(ColumnOrder::UNDEFINED)
}
}
/// Reference counted pointer for [`RowGroupMetaData`].
pub type RowGroupMetaDataPtr = Arc<RowGroupMetaData>;
/// Metadata for a row group
///
/// Includes [`ColumnChunkMetaData`] for each column in the row group, the number of rows
/// the total byte size of the row group, and the [`SchemaDescriptor`] for the row group.
#[derive(Debug, Clone, PartialEq)]
pub struct RowGroupMetaData {
columns: Vec<ColumnChunkMetaData>,
num_rows: i64,
sorting_columns: Option<Vec<SortingColumn>>,
total_byte_size: i64,
schema_descr: SchemaDescPtr,
/// We can't infer from file offset of first column since there may empty columns in row group.
file_offset: Option<i64>,
/// Ordinal position of this row group in file
ordinal: Option<i16>,
}
impl RowGroupMetaData {
/// Returns builder for row group metadata.
pub fn builder(schema_descr: SchemaDescPtr) -> RowGroupMetaDataBuilder {
RowGroupMetaDataBuilder::new(schema_descr)
}
/// Number of columns in this row group.
pub fn num_columns(&self) -> usize {
self.columns.len()
}
/// Returns column chunk metadata for `i`th column.
pub fn column(&self, i: usize) -> &ColumnChunkMetaData {
&self.columns[i]
}
/// Returns slice of column chunk metadata.
pub fn columns(&self) -> &[ColumnChunkMetaData] {
&self.columns
}
/// Returns mutable slice of column chunk metadata.
pub fn columns_mut(&mut self) -> &mut [ColumnChunkMetaData] {
&mut self.columns
}
/// Number of rows in this row group.
pub fn num_rows(&self) -> i64 {
self.num_rows
}
/// Returns the sort ordering of the rows in this RowGroup if any
pub fn sorting_columns(&self) -> Option<&Vec<SortingColumn>> {
self.sorting_columns.as_ref()
}
/// Total byte size of all uncompressed column data in this row group.
pub fn total_byte_size(&self) -> i64 {
self.total_byte_size
}
/// Total size of all compressed column data in this row group.
pub fn compressed_size(&self) -> i64 {
self.columns.iter().map(|c| c.total_compressed_size).sum()
}
/// Returns reference to a schema descriptor.
pub fn schema_descr(&self) -> &SchemaDescriptor {
self.schema_descr.as_ref()
}
/// Returns reference counted clone of schema descriptor.
pub fn schema_descr_ptr(&self) -> SchemaDescPtr {
self.schema_descr.clone()
}
/// Returns ordinal position of this row group in file.
///
/// For example if this is the first row group in the file, this will return 0.
/// If this is the second row group in the file, this will return 1.
#[inline(always)]
pub fn ordinal(&self) -> Option<i16> {
self.ordinal
}
/// Returns file offset of this row group in file.
#[inline(always)]
pub fn file_offset(&self) -> Option<i64> {
self.file_offset
}
/// Method to convert from encrypted Thrift.
#[cfg(feature = "encryption")]
fn from_encrypted_thrift(
schema_descr: SchemaDescPtr,
mut rg: RowGroup,
decryptor: Option<&FileDecryptor>,
) -> Result<RowGroupMetaData> {
if schema_descr.num_columns() != rg.columns.len() {
return Err(general_err!(
"Column count mismatch. Schema has {} columns while Row Group has {}",
schema_descr.num_columns(),
rg.columns.len()
));
}
let total_byte_size = rg.total_byte_size;
let num_rows = rg.num_rows;
let mut columns = vec![];
for (i, (mut c, d)) in rg
.columns
.drain(0..)
.zip(schema_descr.columns())
.enumerate()
{
// Read encrypted metadata if it's present and we have a decryptor.
if let (true, Some(decryptor)) = (c.encrypted_column_metadata.is_some(), decryptor) {
let column_decryptor = match c.crypto_metadata.as_ref() {
None => {
return Err(general_err!(
"No crypto_metadata is set for column '{}', which has encrypted metadata",
d.path().string()
));
}
Some(TColumnCryptoMetaData::ENCRYPTIONWITHCOLUMNKEY(crypto_metadata)) => {
let column_name = crypto_metadata.path_in_schema.join(".");
decryptor.get_column_metadata_decryptor(
column_name.as_str(),
crypto_metadata.key_metadata.as_deref(),
)?
}
Some(TColumnCryptoMetaData::ENCRYPTIONWITHFOOTERKEY(_)) => {
decryptor.get_footer_decryptor()?
}
};
let column_aad = create_module_aad(
decryptor.file_aad(),
ModuleType::ColumnMetaData,
rg.ordinal.unwrap() as usize,
i,
None,
)?;
let buf = c.encrypted_column_metadata.clone().unwrap();
let decrypted_cc_buf = column_decryptor
.decrypt(buf.as_slice(), column_aad.as_ref())
.map_err(|_| {
general_err!(
"Unable to decrypt column '{}', perhaps the column key is wrong?",
d.path().string()
)
})?;
let mut prot = TCompactSliceInputProtocol::new(decrypted_cc_buf.as_slice());
c.meta_data = Some(ColumnMetaData::read_from_in_protocol(&mut prot)?);
}
columns.push(ColumnChunkMetaData::from_thrift(d.clone(), c)?);
}
let sorting_columns = rg.sorting_columns;
Ok(RowGroupMetaData {
columns,
num_rows,
sorting_columns,
total_byte_size,
schema_descr,
file_offset: rg.file_offset,
ordinal: rg.ordinal,
})
}
/// Method to convert from Thrift.
pub fn from_thrift(schema_descr: SchemaDescPtr, mut rg: RowGroup) -> Result<RowGroupMetaData> {
if schema_descr.num_columns() != rg.columns.len() {
return Err(general_err!(
"Column count mismatch. Schema has {} columns while Row Group has {}",
schema_descr.num_columns(),
rg.columns.len()
));
}
let total_byte_size = rg.total_byte_size;
let num_rows = rg.num_rows;
let mut columns = vec![];
for (c, d) in rg.columns.drain(0..).zip(schema_descr.columns()) {
columns.push(ColumnChunkMetaData::from_thrift(d.clone(), c)?);
}
let sorting_columns = rg.sorting_columns;
Ok(RowGroupMetaData {
columns,
num_rows,
sorting_columns,
total_byte_size,
schema_descr,
file_offset: rg.file_offset,
ordinal: rg.ordinal,
})
}
/// Method to convert to Thrift.
pub fn to_thrift(&self) -> RowGroup {
RowGroup {
columns: self.columns().iter().map(|v| v.to_thrift()).collect(),
total_byte_size: self.total_byte_size,
num_rows: self.num_rows,
sorting_columns: self.sorting_columns().cloned(),
file_offset: self.file_offset(),
total_compressed_size: Some(self.compressed_size()),
ordinal: self.ordinal,
}
}
/// Converts this [`RowGroupMetaData`] into a [`RowGroupMetaDataBuilder`]
pub fn into_builder(self) -> RowGroupMetaDataBuilder {
RowGroupMetaDataBuilder(self)
}
}
/// Builder for row group metadata.
pub struct RowGroupMetaDataBuilder(RowGroupMetaData);
impl RowGroupMetaDataBuilder {
/// Creates new builder from schema descriptor.
fn new(schema_descr: SchemaDescPtr) -> Self {
Self(RowGroupMetaData {
columns: Vec::with_capacity(schema_descr.num_columns()),
schema_descr,
file_offset: None,
num_rows: 0,
sorting_columns: None,
total_byte_size: 0,
ordinal: None,
})
}
/// Sets number of rows in this row group.
pub fn set_num_rows(mut self, value: i64) -> Self {
self.0.num_rows = value;
self
}
/// Sets the sorting order for columns
pub fn set_sorting_columns(mut self, value: Option<Vec<SortingColumn>>) -> Self {
self.0.sorting_columns = value;
self
}
/// Sets total size in bytes for this row group.
pub fn set_total_byte_size(mut self, value: i64) -> Self {
self.0.total_byte_size = value;
self
}
/// Takes ownership of the the column metadata in this builder, and clears
/// the list of columns.
///
/// This can be used for more efficient creation of a new RowGroupMetaData
/// from an existing one.
pub fn take_columns(&mut self) -> Vec<ColumnChunkMetaData> {
std::mem::take(&mut self.0.columns)
}
/// Sets column metadata for this row group.
pub fn set_column_metadata(mut self, value: Vec<ColumnChunkMetaData>) -> Self {
self.0.columns = value;
self
}
/// Adds a column metadata to this row group
pub fn add_column_metadata(mut self, value: ColumnChunkMetaData) -> Self {
self.0.columns.push(value);
self
}
/// Sets ordinal for this row group.
pub fn set_ordinal(mut self, value: i16) -> Self {
self.0.ordinal = Some(value);
self
}
/// Sets file offset for this row group.
pub fn set_file_offset(mut self, value: i64) -> Self {
self.0.file_offset = Some(value);
self
}
/// Builds row group metadata.
pub fn build(self) -> Result<RowGroupMetaData> {
if self.0.schema_descr.num_columns() != self.0.columns.len() {
return Err(general_err!(
"Column length mismatch: {} != {}",
self.0.schema_descr.num_columns(),
self.0.columns.len()
));
}
Ok(self.0)
}
}
/// Metadata for a column chunk.
#[derive(Debug, Clone, PartialEq)]
pub struct ColumnChunkMetaData {
column_descr: ColumnDescPtr,
encodings: Vec<Encoding>,
file_path: Option<String>,
file_offset: i64,
num_values: i64,
compression: Compression,
total_compressed_size: i64,
total_uncompressed_size: i64,
data_page_offset: i64,
index_page_offset: Option<i64>,
dictionary_page_offset: Option<i64>,
statistics: Option<Statistics>,
encoding_stats: Option<Vec<PageEncodingStats>>,
bloom_filter_offset: Option<i64>,
bloom_filter_length: Option<i32>,
offset_index_offset: Option<i64>,
offset_index_length: Option<i32>,
column_index_offset: Option<i64>,
column_index_length: Option<i32>,
unencoded_byte_array_data_bytes: Option<i64>,
repetition_level_histogram: Option<LevelHistogram>,
definition_level_histogram: Option<LevelHistogram>,
#[cfg(feature = "encryption")]
column_crypto_metadata: Option<ColumnCryptoMetaData>,
}
/// Histograms for repetition and definition levels.
///
/// Each histogram is a vector of length `max_level + 1`. The value at index `i` is the number of
/// values at level `i`.
///
/// For example, `vec[0]` is the number of rows with level 0, `vec[1]` is the
/// number of rows with level 1, and so on.
///
#[derive(Debug, Clone, PartialEq, Eq, Hash, Default)]
pub struct LevelHistogram {
inner: Vec<i64>,
}
impl LevelHistogram {
/// Creates a new level histogram data.
///
/// Length will be `max_level + 1`.
///
/// Returns `None` when `max_level == 0` (because histograms are not necessary in this case)
pub fn try_new(max_level: i16) -> Option<Self> {
if max_level > 0 {
Some(Self {
inner: vec![0; max_level as usize + 1],
})
} else {
None
}
}
/// Returns a reference to the the histogram's values.
pub fn values(&self) -> &[i64] {
&self.inner
}
/// Return the inner vector, consuming self
pub fn into_inner(self) -> Vec<i64> {
self.inner
}
/// Returns the histogram value at the given index.
///
/// The value of `i` is the number of values with level `i`. For example,
/// `get(1)` returns the number of values with level 1.
///
/// Returns `None` if the index is out of bounds.
pub fn get(&self, index: usize) -> Option<i64> {
self.inner.get(index).copied()
}
/// Adds the values from the other histogram to this histogram
///
/// # Panics
/// If the histograms have different lengths
pub fn add(&mut self, other: &Self) {
assert_eq!(self.len(), other.len());
for (dst, src) in self.inner.iter_mut().zip(other.inner.iter()) {
*dst += src;
}
}
/// return the length of the histogram
pub fn len(&self) -> usize {
self.inner.len()
}
/// returns if the histogram is empty
pub fn is_empty(&self) -> bool {
self.inner.is_empty()
}
/// Sets the values of all histogram levels to 0.
pub fn reset(&mut self) {
for value in self.inner.iter_mut() {
*value = 0;
}
}
/// Updates histogram values using provided repetition levels
///
/// # Panics
/// if any of the levels is greater than the length of the histogram (
/// the argument supplied to [`Self::try_new`])
pub fn update_from_levels(&mut self, levels: &[i16]) {
for &level in levels {
self.inner[level as usize] += 1;
}
}
}
impl From<Vec<i64>> for LevelHistogram {
fn from(inner: Vec<i64>) -> Self {
Self { inner }
}
}
impl From<LevelHistogram> for Vec<i64> {
fn from(value: LevelHistogram) -> Self {
value.into_inner()
}
}
impl HeapSize for LevelHistogram {
fn heap_size(&self) -> usize {
self.inner.heap_size()
}
}
/// Represents common operations for a column chunk.
impl ColumnChunkMetaData {
/// Returns builder for column chunk metadata.
pub fn builder(column_descr: ColumnDescPtr) -> ColumnChunkMetaDataBuilder {
ColumnChunkMetaDataBuilder::new(column_descr)
}
/// File where the column chunk is stored.
///
/// If not set, assumed to belong to the same file as the metadata.
/// This path is relative to the current file.
pub fn file_path(&self) -> Option<&str> {
self.file_path.as_deref()
}
/// Byte offset of `ColumnMetaData` in `file_path()`.
///
/// Note that the meaning of this field has been inconsistent between implementations
/// so its use has since been deprecated in the Parquet specification. Modern implementations
/// will set this to `0` to indicate that the `ColumnMetaData` is solely contained in the
/// `ColumnChunk` struct.
pub fn file_offset(&self) -> i64 {
self.file_offset
}
/// Type of this column. Must be primitive.
pub fn column_type(&self) -> Type {
self.column_descr.physical_type()
}
/// Path (or identifier) of this column.
pub fn column_path(&self) -> &ColumnPath {
self.column_descr.path()
}
/// Descriptor for this column.
pub fn column_descr(&self) -> &ColumnDescriptor {
self.column_descr.as_ref()
}
/// Reference counted clone of descriptor for this column.
pub fn column_descr_ptr(&self) -> ColumnDescPtr {
self.column_descr.clone()
}
/// All encodings used for this column.
pub fn encodings(&self) -> &Vec<Encoding> {
&self.encodings
}
/// Total number of values in this column chunk.
pub fn num_values(&self) -> i64 {
self.num_values
}
/// Compression for this column.
pub fn compression(&self) -> Compression {
self.compression
}
/// Returns the total compressed data size of this column chunk.
pub fn compressed_size(&self) -> i64 {
self.total_compressed_size
}
/// Returns the total uncompressed data size of this column chunk.
pub fn uncompressed_size(&self) -> i64 {
self.total_uncompressed_size
}
/// Returns the offset for the column data.
pub fn data_page_offset(&self) -> i64 {
self.data_page_offset
}
/// Returns the offset for the index page.
pub fn index_page_offset(&self) -> Option<i64> {
self.index_page_offset
}
/// Returns the offset for the dictionary page, if any.
pub fn dictionary_page_offset(&self) -> Option<i64> {
self.dictionary_page_offset
}
/// Returns the offset and length in bytes of the column chunk within the file
pub fn byte_range(&self) -> (u64, u64) {
let col_start = match self.dictionary_page_offset() {
Some(dictionary_page_offset) => dictionary_page_offset,
None => self.data_page_offset(),
};
let col_len = self.compressed_size();
assert!(
col_start >= 0 && col_len >= 0,
"column start and length should not be negative"
);
(col_start as u64, col_len as u64)
}
/// Returns statistics that are set for this column chunk,
/// or `None` if no statistics are available.
pub fn statistics(&self) -> Option<&Statistics> {
self.statistics.as_ref()
}
/// Returns the offset for the page encoding stats,
/// or `None` if no page encoding stats are available.
pub fn page_encoding_stats(&self) -> Option<&Vec<PageEncodingStats>> {
self.encoding_stats.as_ref()
}
/// Returns the offset for the bloom filter.
pub fn bloom_filter_offset(&self) -> Option<i64> {
self.bloom_filter_offset
}
/// Returns the offset for the bloom filter.
pub fn bloom_filter_length(&self) -> Option<i32> {
self.bloom_filter_length
}
/// Returns the offset for the column index.
pub fn column_index_offset(&self) -> Option<i64> {
self.column_index_offset
}
/// Returns the offset for the column index length.
pub fn column_index_length(&self) -> Option<i32> {
self.column_index_length
}
/// Returns the range for the offset index if any
pub(crate) fn column_index_range(&self) -> Option<Range<u64>> {
let offset = u64::try_from(self.column_index_offset?).ok()?;
let length = u64::try_from(self.column_index_length?).ok()?;
Some(offset..(offset + length))
}
/// Returns the offset for the offset index.
pub fn offset_index_offset(&self) -> Option<i64> {
self.offset_index_offset
}
/// Returns the offset for the offset index length.
pub fn offset_index_length(&self) -> Option<i32> {
self.offset_index_length
}
/// Returns the range for the offset index if any
pub(crate) fn offset_index_range(&self) -> Option<Range<u64>> {
let offset = u64::try_from(self.offset_index_offset?).ok()?;
let length = u64::try_from(self.offset_index_length?).ok()?;
Some(offset..(offset + length))
}
/// Returns the number of bytes of variable length data after decoding.
///
/// Only set for BYTE_ARRAY columns. This field may not be set by older
/// writers.
pub fn unencoded_byte_array_data_bytes(&self) -> Option<i64> {
self.unencoded_byte_array_data_bytes
}
/// Returns the repetition level histogram.
///
/// The returned value `vec[i]` is how many values are at repetition level `i`. For example,
/// `vec[0]` indicates how many rows the page contains.
/// This field may not be set by older writers.
pub fn repetition_level_histogram(&self) -> Option<&LevelHistogram> {
self.repetition_level_histogram.as_ref()
}
/// Returns the definition level histogram.
///
/// The returned value `vec[i]` is how many values are at definition level `i`. For example,
/// `vec[max_definition_level]` indicates how many non-null values are present in the page.
/// This field may not be set by older writers.
pub fn definition_level_histogram(&self) -> Option<&LevelHistogram> {
self.definition_level_histogram.as_ref()
}
/// Returns the encryption metadata for this column chunk.
#[cfg(feature = "encryption")]
pub fn crypto_metadata(&self) -> Option<&ColumnCryptoMetaData> {
self.column_crypto_metadata.as_ref()
}
/// Method to convert from Thrift.
pub fn from_thrift(column_descr: ColumnDescPtr, cc: ColumnChunk) -> Result<Self> {
if cc.meta_data.is_none() {
return Err(general_err!("Expected to have column metadata"));
}
let mut col_metadata: ColumnMetaData = cc.meta_data.unwrap();
let column_type = Type::try_from(col_metadata.type_)?;
let encodings = col_metadata
.encodings
.drain(0..)
.map(Encoding::try_from)
.collect::<Result<_>>()?;
let compression = Compression::try_from(col_metadata.codec)?;
let file_path = cc.file_path;
let file_offset = cc.file_offset;
let num_values = col_metadata.num_values;
let total_compressed_size = col_metadata.total_compressed_size;
let total_uncompressed_size = col_metadata.total_uncompressed_size;
let data_page_offset = col_metadata.data_page_offset;
let index_page_offset = col_metadata.index_page_offset;
let dictionary_page_offset = col_metadata.dictionary_page_offset;
let statistics = statistics::from_thrift(column_type, col_metadata.statistics)?;
let encoding_stats = col_metadata
.encoding_stats
.as_ref()
.map(|vec| {
vec.iter()
.map(page_encoding_stats::try_from_thrift)
.collect::<Result<_>>()
})
.transpose()?;
let bloom_filter_offset = col_metadata.bloom_filter_offset;
let bloom_filter_length = col_metadata.bloom_filter_length;
let offset_index_offset = cc.offset_index_offset;
let offset_index_length = cc.offset_index_length;
let column_index_offset = cc.column_index_offset;
let column_index_length = cc.column_index_length;
let (
unencoded_byte_array_data_bytes,
repetition_level_histogram,
definition_level_histogram,
) = if let Some(size_stats) = col_metadata.size_statistics {
(
size_stats.unencoded_byte_array_data_bytes,
size_stats.repetition_level_histogram,
size_stats.definition_level_histogram,
)
} else {
(None, None, None)
};
let repetition_level_histogram = repetition_level_histogram.map(LevelHistogram::from);
let definition_level_histogram = definition_level_histogram.map(LevelHistogram::from);
#[cfg(feature = "encryption")]
let column_crypto_metadata = if let Some(crypto_metadata) = cc.crypto_metadata {
Some(column_crypto_metadata::try_from_thrift(&crypto_metadata)?)
} else {
None
};
let result = ColumnChunkMetaData {
column_descr,
encodings,
file_path,
file_offset,
num_values,
compression,
total_compressed_size,
total_uncompressed_size,
data_page_offset,
index_page_offset,
dictionary_page_offset,
statistics,
encoding_stats,
bloom_filter_offset,
bloom_filter_length,
offset_index_offset,
offset_index_length,
column_index_offset,
column_index_length,
unencoded_byte_array_data_bytes,
repetition_level_histogram,
definition_level_histogram,
#[cfg(feature = "encryption")]
column_crypto_metadata,
};
Ok(result)
}
/// Method to convert to Thrift.
pub fn to_thrift(&self) -> ColumnChunk {
let column_metadata = self.to_column_metadata_thrift();
ColumnChunk {
file_path: self.file_path().map(|s| s.to_owned()),
file_offset: self.file_offset,
meta_data: Some(column_metadata),
offset_index_offset: self.offset_index_offset,
offset_index_length: self.offset_index_length,
column_index_offset: self.column_index_offset,
column_index_length: self.column_index_length,
crypto_metadata: self.column_crypto_metadata_thrift(),
encrypted_column_metadata: None,
}
}
/// Method to convert to Thrift `ColumnMetaData`
pub fn to_column_metadata_thrift(&self) -> ColumnMetaData {
let size_statistics = if self.unencoded_byte_array_data_bytes.is_some()
|| self.repetition_level_histogram.is_some()
|| self.definition_level_histogram.is_some()
{
let repetition_level_histogram = self
.repetition_level_histogram
.as_ref()
.map(|hist| hist.clone().into_inner());
let definition_level_histogram = self
.definition_level_histogram
.as_ref()
.map(|hist| hist.clone().into_inner());
Some(SizeStatistics {
unencoded_byte_array_data_bytes: self.unencoded_byte_array_data_bytes,
repetition_level_histogram,
definition_level_histogram,
})
} else {
None
};
ColumnMetaData {
type_: self.column_type().into(),
encodings: self.encodings().iter().map(|&v| v.into()).collect(),
path_in_schema: self.column_path().as_ref().to_vec(),
codec: self.compression.into(),
num_values: self.num_values,
total_uncompressed_size: self.total_uncompressed_size,
total_compressed_size: self.total_compressed_size,
key_value_metadata: None,
data_page_offset: self.data_page_offset,
index_page_offset: self.index_page_offset,
dictionary_page_offset: self.dictionary_page_offset,
statistics: statistics::to_thrift(self.statistics.as_ref()),
encoding_stats: self
.encoding_stats
.as_ref()
.map(|vec| vec.iter().map(page_encoding_stats::to_thrift).collect()),
bloom_filter_offset: self.bloom_filter_offset,
bloom_filter_length: self.bloom_filter_length,
size_statistics,
}
}
/// Converts this [`ColumnChunkMetaData`] into a [`ColumnChunkMetaDataBuilder`]
pub fn into_builder(self) -> ColumnChunkMetaDataBuilder {
ColumnChunkMetaDataBuilder::from(self)
}
#[cfg(feature = "encryption")]
fn column_crypto_metadata_thrift(&self) -> Option<TColumnCryptoMetaData> {
self.column_crypto_metadata
.as_ref()
.map(column_crypto_metadata::to_thrift)
}
#[cfg(not(feature = "encryption"))]
fn column_crypto_metadata_thrift(&self) -> Option<TColumnCryptoMetaData> {
None
}
}
/// Builder for [`ColumnChunkMetaData`]
///
/// This builder is used to create a new column chunk metadata or modify an
/// existing one.
///
/// # Example
/// ```no_run
/// # use parquet::file::metadata::{ColumnChunkMetaData, ColumnChunkMetaDataBuilder};
/// # fn get_column_chunk_metadata() -> ColumnChunkMetaData { unimplemented!(); }
/// let column_chunk_metadata = get_column_chunk_metadata();
/// // create a new builder from existing column chunk metadata
/// let builder = ColumnChunkMetaDataBuilder::from(column_chunk_metadata);
/// // clear the statistics:
/// let column_chunk_metadata: ColumnChunkMetaData = builder
/// .clear_statistics()
/// .build()
/// .unwrap();
/// ```
pub struct ColumnChunkMetaDataBuilder(ColumnChunkMetaData);
impl ColumnChunkMetaDataBuilder {
/// Creates new column chunk metadata builder.
///
/// See also [`ColumnChunkMetaData::builder`]
fn new(column_descr: ColumnDescPtr) -> Self {
Self(ColumnChunkMetaData {
column_descr,
encodings: Vec::new(),
file_path: None,
file_offset: 0,
num_values: 0,
compression: Compression::UNCOMPRESSED,
total_compressed_size: 0,
total_uncompressed_size: 0,
data_page_offset: 0,
index_page_offset: None,
dictionary_page_offset: None,
statistics: None,
encoding_stats: None,
bloom_filter_offset: None,
bloom_filter_length: None,
offset_index_offset: None,
offset_index_length: None,
column_index_offset: None,
column_index_length: None,
unencoded_byte_array_data_bytes: None,
repetition_level_histogram: None,
definition_level_histogram: None,
#[cfg(feature = "encryption")]
column_crypto_metadata: None,
})
}
/// Sets list of encodings for this column chunk.
pub fn set_encodings(mut self, encodings: Vec<Encoding>) -> Self {
self.0.encodings = encodings;
self
}
/// Sets optional file path for this column chunk.
pub fn set_file_path(mut self, value: String) -> Self {
self.0.file_path = Some(value);
self
}
/// Sets file offset in bytes.
///
/// This field was meant to provide an alternate to storing `ColumnMetadata` directly in
/// the `ColumnChunkMetadata`. However, most Parquet readers assume the `ColumnMetadata`
/// is stored inline and ignore this field.
#[deprecated(
since = "53.0.0",
note = "The Parquet specification requires this field to be 0"
)]
pub fn set_file_offset(mut self, value: i64) -> Self {
self.0.file_offset = value;
self
}
/// Sets number of values.
pub fn set_num_values(mut self, value: i64) -> Self {
self.0.num_values = value;
self
}
/// Sets compression.
pub fn set_compression(mut self, value: Compression) -> Self {
self.0.compression = value;
self
}
/// Sets total compressed size in bytes.
pub fn set_total_compressed_size(mut self, value: i64) -> Self {
self.0.total_compressed_size = value;
self
}
/// Sets total uncompressed size in bytes.
pub fn set_total_uncompressed_size(mut self, value: i64) -> Self {
self.0.total_uncompressed_size = value;
self
}
/// Sets data page offset in bytes.
pub fn set_data_page_offset(mut self, value: i64) -> Self {
self.0.data_page_offset = value;
self
}
/// Sets optional dictionary page offset in bytes.
pub fn set_dictionary_page_offset(mut self, value: Option<i64>) -> Self {
self.0.dictionary_page_offset = value;
self
}
/// Sets optional index page offset in bytes.
pub fn set_index_page_offset(mut self, value: Option<i64>) -> Self {
self.0.index_page_offset = value;
self
}
/// Sets statistics for this column chunk.
pub fn set_statistics(mut self, value: Statistics) -> Self {
self.0.statistics = Some(value);
self
}
/// Clears the statistics for this column chunk.
pub fn clear_statistics(mut self) -> Self {
self.0.statistics = None;
self
}
/// Sets page encoding stats for this column chunk.
pub fn set_page_encoding_stats(mut self, value: Vec<PageEncodingStats>) -> Self {
self.0.encoding_stats = Some(value);
self
}
/// Clears the page encoding stats for this column chunk.
pub fn clear_page_encoding_stats(mut self) -> Self {
self.0.encoding_stats = None;
self
}
/// Sets optional bloom filter offset in bytes.
pub fn set_bloom_filter_offset(mut self, value: Option<i64>) -> Self {
self.0.bloom_filter_offset = value;
self
}
/// Sets optional bloom filter length in bytes.
pub fn set_bloom_filter_length(mut self, value: Option<i32>) -> Self {
self.0.bloom_filter_length = value;
self
}
/// Sets optional offset index offset in bytes.
pub fn set_offset_index_offset(mut self, value: Option<i64>) -> Self {
self.0.offset_index_offset = value;
self
}
/// Sets optional offset index length in bytes.
pub fn set_offset_index_length(mut self, value: Option<i32>) -> Self {
self.0.offset_index_length = value;
self
}
/// Sets optional column index offset in bytes.
pub fn set_column_index_offset(mut self, value: Option<i64>) -> Self {
self.0.column_index_offset = value;
self
}
/// Sets optional column index length in bytes.
pub fn set_column_index_length(mut self, value: Option<i32>) -> Self {
self.0.column_index_length = value;
self
}
/// Sets optional length of variable length data in bytes.
pub fn set_unencoded_byte_array_data_bytes(mut self, value: Option<i64>) -> Self {
self.0.unencoded_byte_array_data_bytes = value;
self
}
/// Sets optional repetition level histogram
pub fn set_repetition_level_histogram(mut self, value: Option<LevelHistogram>) -> Self {
self.0.repetition_level_histogram = value;
self
}
/// Sets optional repetition level histogram
pub fn set_definition_level_histogram(mut self, value: Option<LevelHistogram>) -> Self {
self.0.definition_level_histogram = value;
self
}
#[cfg(feature = "encryption")]
/// Set the encryption metadata for an encrypted column
pub fn set_column_crypto_metadata(mut self, value: Option<ColumnCryptoMetaData>) -> Self {
self.0.column_crypto_metadata = value;
self
}
/// Builds column chunk metadata.
pub fn build(self) -> Result<ColumnChunkMetaData> {
Ok(self.0)
}
}
/// Builder for Parquet [`ColumnIndex`], part of the Parquet [PageIndex]
///
/// [PageIndex]: https://github.com/apache/parquet-format/blob/master/PageIndex.md
pub struct ColumnIndexBuilder {
null_pages: Vec<bool>,
min_values: Vec<Vec<u8>>,
max_values: Vec<Vec<u8>>,
null_counts: Vec<i64>,
boundary_order: BoundaryOrder,
/// contains the concatenation of the histograms of all pages
repetition_level_histograms: Option<Vec<i64>>,
/// contains the concatenation of the histograms of all pages
definition_level_histograms: Option<Vec<i64>>,
/// Is the information in the builder valid?
///
/// Set to `false` if any entry in the page doesn't have statistics for
/// some reason, so statistics for that page won't be written to the file.
/// This might happen if the page is entirely null, or
/// is a floating point column without any non-nan values
/// e.g. <https://github.com/apache/parquet-format/pull/196>
valid: bool,
}
impl Default for ColumnIndexBuilder {
fn default() -> Self {
Self::new()
}
}
impl ColumnIndexBuilder {
/// Creates a new column index builder.
pub fn new() -> Self {
ColumnIndexBuilder {
null_pages: Vec::new(),
min_values: Vec::new(),
max_values: Vec::new(),
null_counts: Vec::new(),
boundary_order: BoundaryOrder::UNORDERED,
repetition_level_histograms: None,
definition_level_histograms: None,
valid: true,
}
}
/// Append statistics for the next page
pub fn append(
&mut self,
null_page: bool,
min_value: Vec<u8>,
max_value: Vec<u8>,
null_count: i64,
) {
self.null_pages.push(null_page);
self.min_values.push(min_value);
self.max_values.push(max_value);
self.null_counts.push(null_count);
}
/// Append the given page-level histograms to the [`ColumnIndex`] histograms.
/// Does nothing if the `ColumnIndexBuilder` is not in the `valid` state.
pub fn append_histograms(
&mut self,
repetition_level_histogram: &Option<LevelHistogram>,
definition_level_histogram: &Option<LevelHistogram>,
) {
if !self.valid {
return;
}
if let Some(ref rep_lvl_hist) = repetition_level_histogram {
let hist = self.repetition_level_histograms.get_or_insert(Vec::new());
hist.reserve(rep_lvl_hist.len());
hist.extend(rep_lvl_hist.values());
}
if let Some(ref def_lvl_hist) = definition_level_histogram {
let hist = self.definition_level_histograms.get_or_insert(Vec::new());
hist.reserve(def_lvl_hist.len());
hist.extend(def_lvl_hist.values());
}
}
/// Set the boundary order of the column index
pub fn set_boundary_order(&mut self, boundary_order: BoundaryOrder) {
self.boundary_order = boundary_order;
}
/// Mark this column index as invalid
pub fn to_invalid(&mut self) {
self.valid = false;
}
/// Is the information in the builder valid?
pub fn valid(&self) -> bool {
self.valid
}
/// Build and get the thrift metadata of column index
///
/// Note: callers should check [`Self::valid`] before calling this method
pub fn build_to_thrift(self) -> ColumnIndex {
ColumnIndex::new(
self.null_pages,
self.min_values,
self.max_values,
self.boundary_order,
self.null_counts,
self.repetition_level_histograms,
self.definition_level_histograms,
)
}
}
impl From<ColumnChunkMetaData> for ColumnChunkMetaDataBuilder {
fn from(value: ColumnChunkMetaData) -> Self {
ColumnChunkMetaDataBuilder(value)
}
}
/// Builder for offset index, part of the Parquet [PageIndex].
///
/// [PageIndex]: https://github.com/apache/parquet-format/blob/master/PageIndex.md
pub struct OffsetIndexBuilder {
offset_array: Vec<i64>,
compressed_page_size_array: Vec<i32>,
first_row_index_array: Vec<i64>,
unencoded_byte_array_data_bytes_array: Option<Vec<i64>>,
current_first_row_index: i64,
}
impl Default for OffsetIndexBuilder {
fn default() -> Self {
Self::new()
}
}
impl OffsetIndexBuilder {
/// Creates a new offset index builder.
pub fn new() -> Self {
OffsetIndexBuilder {
offset_array: Vec::new(),
compressed_page_size_array: Vec::new(),
first_row_index_array: Vec::new(),
unencoded_byte_array_data_bytes_array: None,
current_first_row_index: 0,
}
}
/// Append the row count of the next page.
pub fn append_row_count(&mut self, row_count: i64) {
let current_page_row_index = self.current_first_row_index;
self.first_row_index_array.push(current_page_row_index);
self.current_first_row_index += row_count;
}
/// Append the offset and size of the next page.
pub fn append_offset_and_size(&mut self, offset: i64, compressed_page_size: i32) {
self.offset_array.push(offset);
self.compressed_page_size_array.push(compressed_page_size);
}
/// Append the unencoded byte array data bytes of the next page.
pub fn append_unencoded_byte_array_data_bytes(
&mut self,
unencoded_byte_array_data_bytes: Option<i64>,
) {
if let Some(val) = unencoded_byte_array_data_bytes {
self.unencoded_byte_array_data_bytes_array
.get_or_insert(Vec::new())
.push(val);
}
}
/// Build and get the thrift metadata of offset index
pub fn build_to_thrift(self) -> OffsetIndex {
let locations = self
.offset_array
.iter()
.zip(self.compressed_page_size_array.iter())
.zip(self.first_row_index_array.iter())
.map(|((offset, size), row_index)| PageLocation::new(*offset, *size, *row_index))
.collect::<Vec<_>>();
OffsetIndex::new(locations, self.unencoded_byte_array_data_bytes_array)
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::basic::{PageType, SortOrder};
use crate::file::page_index::index::NativeIndex;
#[test]
fn test_row_group_metadata_thrift_conversion() {
let schema_descr = get_test_schema_descr();
let mut columns = vec![];
for ptr in schema_descr.columns() {
let column = ColumnChunkMetaData::builder(ptr.clone()).build().unwrap();
columns.push(column);
}
let row_group_meta = RowGroupMetaData::builder(schema_descr.clone())
.set_num_rows(1000)
.set_total_byte_size(2000)
.set_column_metadata(columns)
.set_ordinal(1)
.build()
.unwrap();
let row_group_exp = row_group_meta.to_thrift();
let row_group_res = RowGroupMetaData::from_thrift(schema_descr, row_group_exp.clone())
.unwrap()
.to_thrift();
assert_eq!(row_group_res, row_group_exp);
}
#[test]
fn test_row_group_metadata_thrift_conversion_empty() {
let schema_descr = get_test_schema_descr();
let row_group_meta = RowGroupMetaData::builder(schema_descr).build();
assert!(row_group_meta.is_err());
if let Err(e) = row_group_meta {
assert_eq!(
format!("{e}"),
"Parquet error: Column length mismatch: 2 != 0"
);
}
}
/// Test reading a corrupted Parquet file with 3 columns in its schema but only 2 in its row group
#[test]
fn test_row_group_metadata_thrift_corrupted() {
let schema_descr_2cols = Arc::new(SchemaDescriptor::new(Arc::new(
SchemaType::group_type_builder("schema")
.with_fields(vec![
Arc::new(
SchemaType::primitive_type_builder("a", Type::INT32)
.build()
.unwrap(),
),
Arc::new(
SchemaType::primitive_type_builder("b", Type::INT32)
.build()
.unwrap(),
),
])
.build()
.unwrap(),
)));
let schema_descr_3cols = Arc::new(SchemaDescriptor::new(Arc::new(
SchemaType::group_type_builder("schema")
.with_fields(vec![
Arc::new(
SchemaType::primitive_type_builder("a", Type::INT32)
.build()
.unwrap(),
),
Arc::new(
SchemaType::primitive_type_builder("b", Type::INT32)
.build()
.unwrap(),
),
Arc::new(
SchemaType::primitive_type_builder("c", Type::INT32)
.build()
.unwrap(),
),
])
.build()
.unwrap(),
)));
let row_group_meta_2cols = RowGroupMetaData::builder(schema_descr_2cols.clone())
.set_num_rows(1000)
.set_total_byte_size(2000)
.set_column_metadata(vec![
ColumnChunkMetaData::builder(schema_descr_2cols.column(0))
.build()
.unwrap(),
ColumnChunkMetaData::builder(schema_descr_2cols.column(1))
.build()
.unwrap(),
])
.set_ordinal(1)
.build()
.unwrap();
let err =
RowGroupMetaData::from_thrift(schema_descr_3cols, row_group_meta_2cols.to_thrift())
.unwrap_err()
.to_string();
assert_eq!(
err,
"Parquet error: Column count mismatch. Schema has 3 columns while Row Group has 2"
);
}
#[test]
fn test_column_chunk_metadata_thrift_conversion() {
let column_descr = get_test_schema_descr().column(0);
let col_metadata = ColumnChunkMetaData::builder(column_descr.clone())
.set_encodings(vec![Encoding::PLAIN, Encoding::RLE])
.set_file_path("file_path".to_owned())
.set_num_values(1000)
.set_compression(Compression::SNAPPY)
.set_total_compressed_size(2000)
.set_total_uncompressed_size(3000)
.set_data_page_offset(4000)
.set_dictionary_page_offset(Some(5000))
.set_page_encoding_stats(vec![
PageEncodingStats {
page_type: PageType::DATA_PAGE,
encoding: Encoding::PLAIN,
count: 3,
},
PageEncodingStats {
page_type: PageType::DATA_PAGE,
encoding: Encoding::RLE,
count: 5,
},
])
.set_bloom_filter_offset(Some(6000))
.set_bloom_filter_length(Some(25))
.set_offset_index_offset(Some(7000))
.set_offset_index_length(Some(25))
.set_column_index_offset(Some(8000))
.set_column_index_length(Some(25))
.set_unencoded_byte_array_data_bytes(Some(2000))
.set_repetition_level_histogram(Some(LevelHistogram::from(vec![100, 100])))
.set_definition_level_histogram(Some(LevelHistogram::from(vec![0, 200])))
.build()
.unwrap();
let col_chunk_res =
ColumnChunkMetaData::from_thrift(column_descr, col_metadata.to_thrift()).unwrap();
assert_eq!(col_chunk_res, col_metadata);
}
#[test]
fn test_column_chunk_metadata_thrift_conversion_empty() {
let column_descr = get_test_schema_descr().column(0);
let col_metadata = ColumnChunkMetaData::builder(column_descr.clone())
.build()
.unwrap();
let col_chunk_exp = col_metadata.to_thrift();
let col_chunk_res = ColumnChunkMetaData::from_thrift(column_descr, col_chunk_exp.clone())
.unwrap()
.to_thrift();
assert_eq!(col_chunk_res, col_chunk_exp);
}
#[test]
fn test_compressed_size() {
let schema_descr = get_test_schema_descr();
let mut columns = vec![];
for column_descr in schema_descr.columns() {
let column = ColumnChunkMetaData::builder(column_descr.clone())
.set_total_compressed_size(500)
.set_total_uncompressed_size(700)
.build()
.unwrap();
columns.push(column);
}
let row_group_meta = RowGroupMetaData::builder(schema_descr)
.set_num_rows(1000)
.set_column_metadata(columns)
.build()
.unwrap();
let compressed_size_res: i64 = row_group_meta.compressed_size();
let compressed_size_exp: i64 = 1000;
assert_eq!(compressed_size_res, compressed_size_exp);
}
#[test]
fn test_memory_size() {
let schema_descr = get_test_schema_descr();
let columns = schema_descr
.columns()
.iter()
.map(|column_descr| {
ColumnChunkMetaData::builder(column_descr.clone())
.set_statistics(Statistics::new::<i32>(None, None, None, None, false))
.build()
})
.collect::<Result<Vec<_>>>()
.unwrap();
let row_group_meta = RowGroupMetaData::builder(schema_descr.clone())
.set_num_rows(1000)
.set_column_metadata(columns)
.build()
.unwrap();
let row_group_meta = vec![row_group_meta];
let version = 2;
let num_rows = 1000;
let created_by = Some(String::from("test harness"));
let key_value_metadata = Some(vec![KeyValue::new(
String::from("Foo"),
Some(String::from("bar")),
)]);
let column_orders = Some(vec![
ColumnOrder::UNDEFINED,
ColumnOrder::TYPE_DEFINED_ORDER(SortOrder::UNSIGNED),
]);
let file_metadata = FileMetaData::new(
version,
num_rows,
created_by,
key_value_metadata,
schema_descr.clone(),
column_orders,
);
// Now, add in Exact Statistics
let columns_with_stats = schema_descr
.columns()
.iter()
.map(|column_descr| {
ColumnChunkMetaData::builder(column_descr.clone())
.set_statistics(Statistics::new::<i32>(
Some(0),
Some(100),
None,
None,
false,
))
.build()
})
.collect::<Result<Vec<_>>>()
.unwrap();
let row_group_meta_with_stats = RowGroupMetaData::builder(schema_descr)
.set_num_rows(1000)
.set_column_metadata(columns_with_stats)
.build()
.unwrap();
let row_group_meta_with_stats = vec![row_group_meta_with_stats];
let parquet_meta = ParquetMetaDataBuilder::new(file_metadata.clone())
.set_row_groups(row_group_meta_with_stats)
.build();
#[cfg(not(feature = "encryption"))]
let base_expected_size = 2312;
#[cfg(feature = "encryption")]
let base_expected_size = 2640;
assert_eq!(parquet_meta.memory_size(), base_expected_size);
let mut column_index = ColumnIndexBuilder::new();
column_index.append(false, vec![1u8], vec![2u8, 3u8], 4);
let column_index = column_index.build_to_thrift();
let native_index = NativeIndex::<bool>::try_new(column_index).unwrap();
// Now, add in OffsetIndex
let mut offset_index = OffsetIndexBuilder::new();
offset_index.append_row_count(1);
offset_index.append_offset_and_size(2, 3);
offset_index.append_unencoded_byte_array_data_bytes(Some(10));
offset_index.append_row_count(1);
offset_index.append_offset_and_size(2, 3);
offset_index.append_unencoded_byte_array_data_bytes(Some(10));
let offset_index = offset_index.build_to_thrift();
let parquet_meta = ParquetMetaDataBuilder::new(file_metadata)
.set_row_groups(row_group_meta)
.set_column_index(Some(vec![vec![Index::BOOLEAN(native_index)]]))
.set_offset_index(Some(vec![vec![
OffsetIndexMetaData::try_new(offset_index).unwrap()
]]))
.build();
#[cfg(not(feature = "encryption"))]
let bigger_expected_size = 2816;
#[cfg(feature = "encryption")]
let bigger_expected_size = 3144;
// more set fields means more memory usage
assert!(bigger_expected_size > base_expected_size);
assert_eq!(parquet_meta.memory_size(), bigger_expected_size);
}
/// Returns sample schema descriptor so we can create column metadata.
fn get_test_schema_descr() -> SchemaDescPtr {
let schema = SchemaType::group_type_builder("schema")
.with_fields(vec![
Arc::new(
SchemaType::primitive_type_builder("a", Type::INT32)
.build()
.unwrap(),
),
Arc::new(
SchemaType::primitive_type_builder("b", Type::INT32)
.build()
.unwrap(),
),
])
.build()
.unwrap();
Arc::new(SchemaDescriptor::new(Arc::new(schema)))
}
}