parquet/src/column/page.rs (311 lines of code) (raw):
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
//! Contains Parquet Page definitions and page reader interface.
use crate::basic::{Encoding, PageType};
use crate::errors::{ParquetError, Result};
use crate::file::{metadata::ColumnChunkMetaData, statistics::Statistics};
use crate::format::PageHeader;
use crate::util::memory::ByteBufferPtr;
/// Parquet Page definition.
///
/// List of supported pages.
/// These are 1-to-1 mapped from the equivalent Thrift definitions, except `buf` which
/// used to store uncompressed bytes of the page.
#[derive(Clone)]
pub enum Page {
DataPage {
buf: ByteBufferPtr,
num_values: u32,
encoding: Encoding,
def_level_encoding: Encoding,
rep_level_encoding: Encoding,
statistics: Option<Statistics>,
},
DataPageV2 {
buf: ByteBufferPtr,
num_values: u32,
encoding: Encoding,
num_nulls: u32,
num_rows: u32,
def_levels_byte_len: u32,
rep_levels_byte_len: u32,
is_compressed: bool,
statistics: Option<Statistics>,
},
DictionaryPage {
buf: ByteBufferPtr,
num_values: u32,
encoding: Encoding,
is_sorted: bool,
},
}
impl Page {
/// Returns [`PageType`](crate::basic::PageType) for this page.
pub fn page_type(&self) -> PageType {
match self {
Page::DataPage { .. } => PageType::DATA_PAGE,
Page::DataPageV2 { .. } => PageType::DATA_PAGE_V2,
Page::DictionaryPage { .. } => PageType::DICTIONARY_PAGE,
}
}
/// Returns internal byte buffer reference for this page.
pub fn buffer(&self) -> &ByteBufferPtr {
match self {
Page::DataPage { ref buf, .. } => buf,
Page::DataPageV2 { ref buf, .. } => buf,
Page::DictionaryPage { ref buf, .. } => buf,
}
}
/// Returns number of values in this page.
pub fn num_values(&self) -> u32 {
match self {
Page::DataPage { num_values, .. } => *num_values,
Page::DataPageV2 { num_values, .. } => *num_values,
Page::DictionaryPage { num_values, .. } => *num_values,
}
}
/// Returns this page [`Encoding`](crate::basic::Encoding).
pub fn encoding(&self) -> Encoding {
match self {
Page::DataPage { encoding, .. } => *encoding,
Page::DataPageV2 { encoding, .. } => *encoding,
Page::DictionaryPage { encoding, .. } => *encoding,
}
}
/// Returns optional [`Statistics`](crate::file::statistics::Statistics).
pub fn statistics(&self) -> Option<&Statistics> {
match self {
Page::DataPage { ref statistics, .. } => statistics.as_ref(),
Page::DataPageV2 { ref statistics, .. } => statistics.as_ref(),
Page::DictionaryPage { .. } => None,
}
}
}
/// Helper struct to represent pages with potentially compressed buffer (data page v1) or
/// compressed and concatenated buffer (def levels + rep levels + compressed values for
/// data page v2).
///
/// The difference with `Page` is that `Page` buffer is always uncompressed.
pub struct CompressedPage {
compressed_page: Page,
uncompressed_size: usize,
}
impl CompressedPage {
/// Creates `CompressedPage` from a page with potentially compressed buffer and
/// uncompressed size.
pub fn new(compressed_page: Page, uncompressed_size: usize) -> Self {
Self {
compressed_page,
uncompressed_size,
}
}
/// Returns page type.
pub fn page_type(&self) -> PageType {
self.compressed_page.page_type()
}
/// Returns underlying page with potentially compressed buffer.
pub fn compressed_page(&self) -> &Page {
&self.compressed_page
}
/// Returns uncompressed size in bytes.
pub fn uncompressed_size(&self) -> usize {
self.uncompressed_size
}
/// Returns compressed size in bytes.
///
/// Note that it is assumed that buffer is compressed, but it may not be. In this
/// case compressed size will be equal to uncompressed size.
pub fn compressed_size(&self) -> usize {
self.compressed_page.buffer().len()
}
/// Number of values in page.
pub fn num_values(&self) -> u32 {
self.compressed_page.num_values()
}
/// Returns encoding for values in page.
pub fn encoding(&self) -> Encoding {
self.compressed_page.encoding()
}
/// Returns slice of compressed buffer in the page.
pub fn data(&self) -> &[u8] {
self.compressed_page.buffer().data()
}
/// Returns the thrift page header
pub(crate) fn to_thrift_header(&self) -> PageHeader {
let uncompressed_size = self.uncompressed_size();
let compressed_size = self.compressed_size();
let num_values = self.num_values();
let encoding = self.encoding();
let page_type = self.page_type();
let mut page_header = PageHeader {
type_: page_type.into(),
uncompressed_page_size: uncompressed_size as i32,
compressed_page_size: compressed_size as i32,
// TODO: Add support for crc checksum
crc: None,
data_page_header: None,
index_page_header: None,
dictionary_page_header: None,
data_page_header_v2: None,
};
match self.compressed_page {
Page::DataPage {
def_level_encoding,
rep_level_encoding,
ref statistics,
..
} => {
let data_page_header = crate::format::DataPageHeader {
num_values: num_values as i32,
encoding: encoding.into(),
definition_level_encoding: def_level_encoding.into(),
repetition_level_encoding: rep_level_encoding.into(),
statistics: crate::file::statistics::to_thrift(statistics.as_ref()),
};
page_header.data_page_header = Some(data_page_header);
}
Page::DataPageV2 {
num_nulls,
num_rows,
def_levels_byte_len,
rep_levels_byte_len,
is_compressed,
ref statistics,
..
} => {
let data_page_header_v2 = crate::format::DataPageHeaderV2 {
num_values: num_values as i32,
num_nulls: num_nulls as i32,
num_rows: num_rows as i32,
encoding: encoding.into(),
definition_levels_byte_length: def_levels_byte_len as i32,
repetition_levels_byte_length: rep_levels_byte_len as i32,
is_compressed: Some(is_compressed),
statistics: crate::file::statistics::to_thrift(statistics.as_ref()),
};
page_header.data_page_header_v2 = Some(data_page_header_v2);
}
Page::DictionaryPage { is_sorted, .. } => {
let dictionary_page_header = crate::format::DictionaryPageHeader {
num_values: num_values as i32,
encoding: encoding.into(),
is_sorted: Some(is_sorted),
};
page_header.dictionary_page_header = Some(dictionary_page_header);
}
}
page_header
}
}
/// Contains page write metrics.
pub struct PageWriteSpec {
pub page_type: PageType,
pub uncompressed_size: usize,
pub compressed_size: usize,
pub num_values: u32,
pub offset: u64,
pub bytes_written: u64,
}
impl Default for PageWriteSpec {
fn default() -> Self {
Self::new()
}
}
impl PageWriteSpec {
/// Creates new spec with default page write metrics.
pub fn new() -> Self {
Self {
page_type: PageType::DATA_PAGE,
uncompressed_size: 0,
compressed_size: 0,
num_values: 0,
offset: 0,
bytes_written: 0,
}
}
}
/// Contains metadata for a page
#[derive(Clone)]
pub struct PageMetadata {
/// The number of rows within the page if known
pub num_rows: Option<usize>,
/// The number of levels within the page if known
pub num_levels: Option<usize>,
/// Returns true if the page is a dictionary page
pub is_dict: bool,
}
impl TryFrom<&PageHeader> for PageMetadata {
type Error = ParquetError;
fn try_from(value: &PageHeader) -> std::result::Result<Self, Self::Error> {
match value.type_ {
crate::format::PageType::DATA_PAGE => {
let header = value.data_page_header.as_ref().unwrap();
Ok(PageMetadata {
num_rows: None,
num_levels: Some(header.num_values as _),
is_dict: false,
})
}
crate::format::PageType::DICTIONARY_PAGE => Ok(PageMetadata {
num_rows: None,
num_levels: None,
is_dict: true,
}),
crate::format::PageType::DATA_PAGE_V2 => {
let header = value.data_page_header_v2.as_ref().unwrap();
Ok(PageMetadata {
num_rows: Some(header.num_rows as _),
num_levels: Some(header.num_values as _),
is_dict: false,
})
}
other => Err(ParquetError::General(format!(
"page type {other:?} cannot be converted to PageMetadata"
))),
}
}
}
/// API for reading pages from a column chunk.
/// This offers a iterator like API to get the next page.
pub trait PageReader: Iterator<Item = Result<Page>> + Send {
/// Gets the next page in the column chunk associated with this reader.
/// Returns `None` if there are no pages left.
fn get_next_page(&mut self) -> Result<Option<Page>>;
/// Gets metadata about the next page, returns an error if no
/// column index information
fn peek_next_page(&mut self) -> Result<Option<PageMetadata>>;
/// Skips reading the next page, returns an error if no
/// column index information
fn skip_next_page(&mut self) -> Result<()>;
}
/// API for writing pages in a column chunk.
///
/// It is reasonable to assume that all pages will be written in the correct order, e.g.
/// dictionary page followed by data pages, or a set of data pages, etc.
pub trait PageWriter: Send {
/// Writes a page into the output stream/sink.
/// Returns `PageWriteSpec` that contains information about written page metrics,
/// including number of bytes, size, number of values, offset, etc.
///
/// This method is called for every compressed page we write into underlying buffer,
/// either data page or dictionary page.
fn write_page(&mut self, page: CompressedPage) -> Result<PageWriteSpec>;
/// Writes column chunk metadata into the output stream/sink.
///
/// This method is called once before page writer is closed, normally when writes are
/// finalised in column writer.
fn write_metadata(&mut self, metadata: &ColumnChunkMetaData) -> Result<()>;
/// Closes resources and flushes underlying sink.
/// Page writer should not be used after this method is called.
fn close(&mut self) -> Result<()>;
}
/// An iterator over pages of one specific column in a parquet file.
pub trait PageIterator: Iterator<Item = Result<Box<dyn PageReader>>> + Send {}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_page() {
let data_page = Page::DataPage {
buf: ByteBufferPtr::new(vec![0, 1, 2]),
num_values: 10,
encoding: Encoding::PLAIN,
def_level_encoding: Encoding::RLE,
rep_level_encoding: Encoding::RLE,
statistics: Some(Statistics::int32(Some(1), Some(2), None, 1, true)),
};
assert_eq!(data_page.page_type(), PageType::DATA_PAGE);
assert_eq!(data_page.buffer().data(), vec![0, 1, 2].as_slice());
assert_eq!(data_page.num_values(), 10);
assert_eq!(data_page.encoding(), Encoding::PLAIN);
assert_eq!(
data_page.statistics(),
Some(&Statistics::int32(Some(1), Some(2), None, 1, true))
);
let data_page_v2 = Page::DataPageV2 {
buf: ByteBufferPtr::new(vec![0, 1, 2]),
num_values: 10,
encoding: Encoding::PLAIN,
num_nulls: 5,
num_rows: 20,
def_levels_byte_len: 30,
rep_levels_byte_len: 40,
is_compressed: false,
statistics: Some(Statistics::int32(Some(1), Some(2), None, 1, true)),
};
assert_eq!(data_page_v2.page_type(), PageType::DATA_PAGE_V2);
assert_eq!(data_page_v2.buffer().data(), vec![0, 1, 2].as_slice());
assert_eq!(data_page_v2.num_values(), 10);
assert_eq!(data_page_v2.encoding(), Encoding::PLAIN);
assert_eq!(
data_page_v2.statistics(),
Some(&Statistics::int32(Some(1), Some(2), None, 1, true))
);
let dict_page = Page::DictionaryPage {
buf: ByteBufferPtr::new(vec![0, 1, 2]),
num_values: 10,
encoding: Encoding::PLAIN,
is_sorted: false,
};
assert_eq!(dict_page.page_type(), PageType::DICTIONARY_PAGE);
assert_eq!(dict_page.buffer().data(), vec![0, 1, 2].as_slice());
assert_eq!(dict_page.num_values(), 10);
assert_eq!(dict_page.encoding(), Encoding::PLAIN);
assert_eq!(dict_page.statistics(), None);
}
#[test]
fn test_compressed_page() {
let data_page = Page::DataPage {
buf: ByteBufferPtr::new(vec![0, 1, 2]),
num_values: 10,
encoding: Encoding::PLAIN,
def_level_encoding: Encoding::RLE,
rep_level_encoding: Encoding::RLE,
statistics: Some(Statistics::int32(Some(1), Some(2), None, 1, true)),
};
let cpage = CompressedPage::new(data_page, 5);
assert_eq!(cpage.page_type(), PageType::DATA_PAGE);
assert_eq!(cpage.uncompressed_size(), 5);
assert_eq!(cpage.compressed_size(), 3);
assert_eq!(cpage.num_values(), 10);
assert_eq!(cpage.encoding(), Encoding::PLAIN);
assert_eq!(cpage.data(), &[0, 1, 2]);
}
}