fn get_next_page()

in parquet/src/file/serialized_reader.rs [776:899]


    fn get_next_page(&mut self) -> Result<Option<Page>> {
        loop {
            let page = match &mut self.state {
                SerializedPageReaderState::Values {
                    offset,
                    remaining_bytes: remaining,
                    next_page_header,
                    page_ordinal,
                    require_dictionary,
                } => {
                    if *remaining == 0 {
                        return Ok(None);
                    }

                    let mut read = self.reader.get_read(*offset as u64)?;
                    let header = if let Some(header) = next_page_header.take() {
                        *header
                    } else {
                        #[cfg(feature = "encryption")]
                        let (header_len, header) = if self.crypto_context.is_some() {
                            let crypto_context = page_crypto_context(
                                &self.crypto_context,
                                *page_ordinal,
                                *require_dictionary,
                            )?;
                            read_encrypted_page_header_len(&mut read, crypto_context)?
                        } else {
                            read_page_header_len(&mut read)?
                        };

                        #[cfg(not(feature = "encryption"))]
                        let (header_len, header) = read_page_header_len(&mut read)?;

                        verify_page_header_len(header_len, *remaining)?;
                        *offset += header_len;
                        *remaining -= header_len;
                        header
                    };
                    verify_page_size(
                        header.compressed_page_size,
                        header.uncompressed_page_size,
                        *remaining,
                    )?;
                    let data_len = header.compressed_page_size as usize;
                    *offset += data_len;
                    *remaining -= data_len;

                    if header.type_ == PageType::INDEX_PAGE {
                        continue;
                    }

                    let mut buffer = Vec::with_capacity(data_len);
                    let read = read.take(data_len as u64).read_to_end(&mut buffer)?;

                    if read != data_len {
                        return Err(eof_err!(
                            "Expected to read {} bytes of page, read only {}",
                            data_len,
                            read
                        ));
                    }

                    #[cfg(feature = "encryption")]
                    let crypto_context = page_crypto_context(
                        &self.crypto_context,
                        *page_ordinal,
                        *require_dictionary,
                    )?;
                    #[cfg(feature = "encryption")]
                    let buffer: Vec<u8> = if let Some(crypto_context) = crypto_context {
                        let decryptor = crypto_context.data_decryptor();
                        let aad = crypto_context.create_page_aad()?;
                        decryptor.decrypt(buffer.as_ref(), &aad)?
                    } else {
                        buffer
                    };

                    let page = decode_page(
                        header,
                        Bytes::from(buffer),
                        self.physical_type,
                        self.decompressor.as_mut(),
                    )?;
                    if page.is_data_page() {
                        *page_ordinal += 1;
                    } else if page.is_dictionary_page() {
                        *require_dictionary = false;
                    }
                    page
                }
                SerializedPageReaderState::Pages {
                    page_locations,
                    dictionary_page,
                    ..
                } => {
                    let front = match dictionary_page
                        .take()
                        .or_else(|| page_locations.pop_front())
                    {
                        Some(front) => front,
                        None => return Ok(None),
                    };

                    let page_len = usize::try_from(front.compressed_page_size)?;

                    let buffer = self.reader.get_bytes(front.offset as u64, page_len)?;

                    let mut prot = TCompactSliceInputProtocol::new(buffer.as_ref());
                    let header = PageHeader::read_from_in_protocol(&mut prot)?;
                    let offset = buffer.len() - prot.as_slice().len();

                    let bytes = buffer.slice(offset..);
                    decode_page(
                        header,
                        bytes,
                        self.physical_type,
                        self.decompressor.as_mut(),
                    )?
                }
            };

            return Ok(Some(page));
        }
    }