in cpp-ch/local-engine/Storages/ch_parquet/arrow/column_reader.cc [438:576]
std::shared_ptr<Page> SerializedPageReader::NextPage() {
ThriftDeserializer deserializer(properties_);
// Loop here because there may be unhandled page types that we skip until
// finding a page that we do know what to do with
while (seen_num_values_ < total_num_values_) {
uint32_t header_size = 0;
uint32_t allowed_page_size = kDefaultPageHeaderSize;
// Page headers can be very large because of page statistics
// We try to deserialize a larger buffer progressively
// until a maximum allowed header limit
while (true) {
PARQUET_ASSIGN_OR_THROW(auto view, stream_->Peek(allowed_page_size));
if (view.size() == 0) {
return std::shared_ptr<Page>(nullptr);
}
// This gets used, then set by DeserializeThriftMsg
header_size = static_cast<uint32_t>(view.size());
try {
if (crypto_ctx_.meta_decryptor != nullptr) {
UpdateDecryption(crypto_ctx_.meta_decryptor, encryption::kDictionaryPageHeader,
&data_page_header_aad_);
}
deserializer.DeserializeMessage(reinterpret_cast<const uint8_t*>(view.data()),
&header_size, ¤t_page_header_,
crypto_ctx_.meta_decryptor);
break;
} catch (std::exception& e) {
// Failed to deserialize. Double the allowed page header size and try again
std::stringstream ss;
ss << e.what();
allowed_page_size *= 2;
if (allowed_page_size > max_page_header_size_) {
ss << "Deserializing page header failed.\n";
throw ParquetException(ss.str());
}
}
}
// Advance the stream offset
PARQUET_THROW_NOT_OK(stream_->Advance(header_size));
int compressed_len = current_page_header_.compressed_page_size;
int uncompressed_len = current_page_header_.uncompressed_page_size;
if (compressed_len < 0 || uncompressed_len < 0) {
throw ParquetException("Invalid page header");
}
EncodedStatistics data_page_statistics;
if (ShouldSkipPage(&data_page_statistics)) {
PARQUET_THROW_NOT_OK(stream_->Advance(compressed_len));
continue;
}
if (crypto_ctx_.data_decryptor != nullptr) {
UpdateDecryption(crypto_ctx_.data_decryptor, encryption::kDictionaryPage,
&data_page_aad_);
}
// Read the compressed data page.
PARQUET_ASSIGN_OR_THROW(auto page_buffer, stream_->Read(compressed_len));
if (page_buffer->size() != compressed_len) {
std::stringstream ss;
ss << "Page was smaller (" << page_buffer->size() << ") than expected ("
<< compressed_len << ")";
ParquetException::EofException(ss.str());
}
// Decrypt it if we need to
if (crypto_ctx_.data_decryptor != nullptr) {
PARQUET_THROW_NOT_OK(decryption_buffer_->Resize(
compressed_len - crypto_ctx_.data_decryptor->CiphertextSizeDelta(),
/*shrink_to_fit=*/false));
compressed_len = crypto_ctx_.data_decryptor->Decrypt(
page_buffer->data(), compressed_len, decryption_buffer_->mutable_data());
page_buffer = decryption_buffer_;
}
// Uncompress and construct the pages to return.
const PageType::type page_type = LoadEnumSafe(¤t_page_header_.type);
if (page_type == PageType::DICTIONARY_PAGE) {
crypto_ctx_.start_decrypt_with_dictionary_page = false;
const format::DictionaryPageHeader& dict_header =
current_page_header_.dictionary_page_header;
bool is_sorted = dict_header.__isset.is_sorted ? dict_header.is_sorted : false;
page_buffer =
DecompressIfNeeded(std::move(page_buffer), compressed_len, uncompressed_len);
return std::make_shared<DictionaryPage>(page_buffer, dict_header.num_values,
LoadEnumSafe(&dict_header.encoding),
is_sorted);
} else if (page_type == PageType::DATA_PAGE) {
++page_ordinal_;
const format::DataPageHeader& header = current_page_header_.data_page_header;
page_buffer =
DecompressIfNeeded(std::move(page_buffer), compressed_len, uncompressed_len);
return std::make_shared<DataPageV1>(page_buffer, header.num_values,
LoadEnumSafe(&header.encoding),
LoadEnumSafe(&header.definition_level_encoding),
LoadEnumSafe(&header.repetition_level_encoding),
uncompressed_len, data_page_statistics);
} else if (page_type == PageType::DATA_PAGE_V2) {
++page_ordinal_;
const format::DataPageHeaderV2& header = current_page_header_.data_page_header_v2;
// Arrow prior to 3.0.0 set is_compressed to false but still compressed.
bool is_compressed =
(header.__isset.is_compressed ? header.is_compressed : false) ||
always_compressed_;
// Uncompress if needed
int levels_byte_len;
if (AddWithOverflow(header.definition_levels_byte_length,
header.repetition_levels_byte_length, &levels_byte_len)) {
throw ParquetException("Levels size too large (corrupt file?)");
}
// DecompressIfNeeded doesn't take `is_compressed` into account as
// it's page type-agnostic.
if (is_compressed) {
page_buffer = DecompressIfNeeded(std::move(page_buffer), compressed_len,
uncompressed_len, levels_byte_len);
}
return std::make_shared<DataPageV2>(
page_buffer, header.num_values, header.num_nulls, header.num_rows,
LoadEnumSafe(&header.encoding), header.definition_levels_byte_length,
header.repetition_levels_byte_length, uncompressed_len, is_compressed,
data_page_statistics);
} else {
throw ParquetException(
"Internal error, we have already skipped non-data pages in ShouldSkipPage()");
}
}
return std::shared_ptr<Page>(nullptr);
}