in src/parquet/column_reader.cc [318:414]
bool TypedColumnReader<DType>::ReadNewPage() {
// Loop until we find the next data page.
const uint8_t* buffer;
while (true) {
current_page_ = pager_->NextPage();
if (!current_page_) {
// EOS
return false;
}
if (current_page_->type() == PageType::DICTIONARY_PAGE) {
ConfigureDictionary(static_cast<const DictionaryPage*>(current_page_.get()));
continue;
} else if (current_page_->type() == PageType::DATA_PAGE) {
const DataPage* page = static_cast<const DataPage*>(current_page_.get());
// Read a data page.
num_buffered_values_ = page->num_values();
// Have not decoded any values from the data page yet
num_decoded_values_ = 0;
buffer = page->data();
// If the data page includes repetition and definition levels, we
// initialize the level decoder and subtract the encoded level bytes from
// the page size to determine the number of bytes in the encoded data.
int64_t data_size = page->size();
// Data page Layout: Repetition Levels - Definition Levels - encoded values.
// Levels are encoded as rle or bit-packed.
// Init repetition levels
if (descr_->max_repetition_level() > 0) {
int64_t rep_levels_bytes = repetition_level_decoder_.SetData(
page->repetition_level_encoding(), descr_->max_repetition_level(),
static_cast<int>(num_buffered_values_), buffer);
buffer += rep_levels_bytes;
data_size -= rep_levels_bytes;
}
// TODO figure a way to set max_definition_level_ to 0
// if the initial value is invalid
// Init definition levels
if (descr_->max_definition_level() > 0) {
int64_t def_levels_bytes = definition_level_decoder_.SetData(
page->definition_level_encoding(), descr_->max_definition_level(),
static_cast<int>(num_buffered_values_), buffer);
buffer += def_levels_bytes;
data_size -= def_levels_bytes;
}
// Get a decoder object for this page or create a new decoder if this is the
// first page with this encoding.
Encoding::type encoding = page->encoding();
if (IsDictionaryIndexEncoding(encoding)) {
encoding = Encoding::RLE_DICTIONARY;
}
auto it = decoders_.find(static_cast<int>(encoding));
if (it != decoders_.end()) {
if (encoding == Encoding::RLE_DICTIONARY) {
DCHECK(current_decoder_->encoding() == Encoding::RLE_DICTIONARY);
}
current_decoder_ = it->second.get();
} else {
switch (encoding) {
case Encoding::PLAIN: {
std::shared_ptr<DecoderType> decoder(new PlainDecoder<DType>(descr_));
decoders_[static_cast<int>(encoding)] = decoder;
current_decoder_ = decoder.get();
break;
}
case Encoding::RLE_DICTIONARY:
throw ParquetException("Dictionary page must be before data page.");
case Encoding::DELTA_BINARY_PACKED:
case Encoding::DELTA_LENGTH_BYTE_ARRAY:
case Encoding::DELTA_BYTE_ARRAY:
ParquetException::NYI("Unsupported encoding");
default:
throw ParquetException("Unknown encoding type.");
}
}
current_decoder_->SetData(static_cast<int>(num_buffered_values_), buffer,
static_cast<int>(data_size));
return true;
} else {
// We don't know what this page type is. We're allowed to skip non-data
// pages.
continue;
}
}
return true;
}