in src/parquet/column_reader.cc [139:255]
std::shared_ptr<Page> SerializedPageReader::NextPage() {
// Loop here because there may be unhandled page types that we skip until
// finding a page that we do know what to do with
while (seen_num_rows_ < total_num_rows_) {
int64_t bytes_read = 0;
int64_t bytes_available = 0;
uint32_t header_size = 0;
const uint8_t* buffer;
uint32_t allowed_page_size = kDefaultPageHeaderSize;
// Page headers can be very large because of page statistics
// We try to deserialize a larger buffer progressively
// until a maximum allowed header limit
while (true) {
buffer = stream_->Peek(allowed_page_size, &bytes_available);
if (bytes_available == 0) {
return std::shared_ptr<Page>(nullptr);
}
// This gets used, then set by DeserializeThriftMsg
header_size = static_cast<uint32_t>(bytes_available);
try {
DeserializeThriftMsg(buffer, &header_size, ¤t_page_header_);
break;
} catch (std::exception& e) {
// Failed to deserialize. Double the allowed page header size and try again
std::stringstream ss;
ss << e.what();
allowed_page_size *= 2;
if (allowed_page_size > max_page_header_size_) {
ss << "Deserializing page header failed.\n";
throw ParquetException(ss.str());
}
}
}
// Advance the stream offset
stream_->Advance(header_size);
int compressed_len = current_page_header_.compressed_page_size;
int uncompressed_len = current_page_header_.uncompressed_page_size;
// Read the compressed data page.
buffer = stream_->Read(compressed_len, &bytes_read);
if (bytes_read != compressed_len) {
std::stringstream ss;
ss << "Page was smaller (" << bytes_read << ") than expected (" << compressed_len
<< ")";
ParquetException::EofException(ss.str());
}
// Uncompress it if we need to
if (decompressor_ != nullptr) {
// Grow the uncompressed buffer if we need to.
if (uncompressed_len > static_cast<int>(decompression_buffer_->size())) {
PARQUET_THROW_NOT_OK(decompression_buffer_->Resize(uncompressed_len, false));
}
PARQUET_THROW_NOT_OK(
decompressor_->Decompress(compressed_len, buffer, uncompressed_len,
decompression_buffer_->mutable_data()));
buffer = decompression_buffer_->data();
}
auto page_buffer = std::make_shared<Buffer>(buffer, uncompressed_len);
if (current_page_header_.type == format::PageType::DICTIONARY_PAGE) {
const format::DictionaryPageHeader& dict_header =
current_page_header_.dictionary_page_header;
bool is_sorted = dict_header.__isset.is_sorted ? dict_header.is_sorted : false;
return std::make_shared<DictionaryPage>(page_buffer, dict_header.num_values,
FromThrift(dict_header.encoding),
is_sorted);
} else if (current_page_header_.type == format::PageType::DATA_PAGE) {
const format::DataPageHeader& header = current_page_header_.data_page_header;
EncodedStatistics page_statistics;
if (header.__isset.statistics) {
const format::Statistics& stats = header.statistics;
if (stats.__isset.max) {
page_statistics.set_max(stats.max);
}
if (stats.__isset.min) {
page_statistics.set_min(stats.min);
}
if (stats.__isset.null_count) {
page_statistics.set_null_count(stats.null_count);
}
if (stats.__isset.distinct_count) {
page_statistics.set_distinct_count(stats.distinct_count);
}
}
seen_num_rows_ += header.num_values;
return std::make_shared<DataPage>(
page_buffer, header.num_values, FromThrift(header.encoding),
FromThrift(header.definition_level_encoding),
FromThrift(header.repetition_level_encoding), page_statistics);
} else if (current_page_header_.type == format::PageType::DATA_PAGE_V2) {
const format::DataPageHeaderV2& header = current_page_header_.data_page_header_v2;
bool is_compressed = header.__isset.is_compressed ? header.is_compressed : false;
seen_num_rows_ += header.num_values;
return std::make_shared<DataPageV2>(
page_buffer, header.num_values, header.num_nulls, header.num_rows,
FromThrift(header.encoding), header.definition_levels_byte_length,
header.repetition_levels_byte_length, is_compressed);
} else {
// We don't know what this page type is. We're allowed to skip non-data
// pages.
continue;
}
}
return std::shared_ptr<Page>(nullptr);
}