in parquet/src/file/serialized_reader.rs [776:899]
fn get_next_page(&mut self) -> Result<Option<Page>> {
loop {
let page = match &mut self.state {
SerializedPageReaderState::Values {
offset,
remaining_bytes: remaining,
next_page_header,
page_ordinal,
require_dictionary,
} => {
if *remaining == 0 {
return Ok(None);
}
let mut read = self.reader.get_read(*offset as u64)?;
let header = if let Some(header) = next_page_header.take() {
*header
} else {
#[cfg(feature = "encryption")]
let (header_len, header) = if self.crypto_context.is_some() {
let crypto_context = page_crypto_context(
&self.crypto_context,
*page_ordinal,
*require_dictionary,
)?;
read_encrypted_page_header_len(&mut read, crypto_context)?
} else {
read_page_header_len(&mut read)?
};
#[cfg(not(feature = "encryption"))]
let (header_len, header) = read_page_header_len(&mut read)?;
verify_page_header_len(header_len, *remaining)?;
*offset += header_len;
*remaining -= header_len;
header
};
verify_page_size(
header.compressed_page_size,
header.uncompressed_page_size,
*remaining,
)?;
let data_len = header.compressed_page_size as usize;
*offset += data_len;
*remaining -= data_len;
if header.type_ == PageType::INDEX_PAGE {
continue;
}
let mut buffer = Vec::with_capacity(data_len);
let read = read.take(data_len as u64).read_to_end(&mut buffer)?;
if read != data_len {
return Err(eof_err!(
"Expected to read {} bytes of page, read only {}",
data_len,
read
));
}
#[cfg(feature = "encryption")]
let crypto_context = page_crypto_context(
&self.crypto_context,
*page_ordinal,
*require_dictionary,
)?;
#[cfg(feature = "encryption")]
let buffer: Vec<u8> = if let Some(crypto_context) = crypto_context {
let decryptor = crypto_context.data_decryptor();
let aad = crypto_context.create_page_aad()?;
decryptor.decrypt(buffer.as_ref(), &aad)?
} else {
buffer
};
let page = decode_page(
header,
Bytes::from(buffer),
self.physical_type,
self.decompressor.as_mut(),
)?;
if page.is_data_page() {
*page_ordinal += 1;
} else if page.is_dictionary_page() {
*require_dictionary = false;
}
page
}
SerializedPageReaderState::Pages {
page_locations,
dictionary_page,
..
} => {
let front = match dictionary_page
.take()
.or_else(|| page_locations.pop_front())
{
Some(front) => front,
None => return Ok(None),
};
let page_len = usize::try_from(front.compressed_page_size)?;
let buffer = self.reader.get_bytes(front.offset as u64, page_len)?;
let mut prot = TCompactSliceInputProtocol::new(buffer.as_ref());
let header = PageHeader::read_from_in_protocol(&mut prot)?;
let offset = buffer.len() - prot.as_slice().len();
let bytes = buffer.slice(offset..);
decode_page(
header,
bytes,
self.physical_type,
self.decompressor.as_mut(),
)?
}
};
return Ok(Some(page));
}
}