in src/parquet/arrow/record_reader.cc [440:507]
int64_t ReadRecords(int64_t num_records) override {
// Delimit records, then read values at the end
int64_t records_read = 0;
if (levels_position_ < levels_written_) {
records_read += ReadRecordData(num_records);
}
int64_t level_batch_size = std::max(kMinLevelBatchSize, num_records);
// If we are in the middle of a record, we continue until reaching the
// desired number of records or the end of the current record if we've found
// enough records
while (!at_record_start_ || records_read < num_records) {
// Is there more data to read in this row group?
if (!HasNext()) {
if (!at_record_start_) {
// We ended the row group while inside a record that we haven't seen
// the end of yet. So increment the record count for the last record in
// the row group
++records_read;
at_record_start_ = true;
}
break;
}
/// We perform multiple batch reads until we either exhaust the row group
/// or observe the desired number of records
int64_t batch_size = std::min(level_batch_size, available_values_current_page());
// No more data in column
if (batch_size == 0) {
break;
}
if (max_def_level_ > 0) {
ReserveLevels(batch_size);
int16_t* def_levels = this->def_levels() + levels_written_;
int16_t* rep_levels = this->rep_levels() + levels_written_;
// Not present for non-repeated fields
int64_t levels_read = 0;
if (max_rep_level_ > 0) {
levels_read = ReadDefinitionLevels(batch_size, def_levels);
if (ReadRepetitionLevels(batch_size, rep_levels) != levels_read) {
throw ParquetException("Number of decoded rep / def levels did not match");
}
} else if (max_def_level_ > 0) {
levels_read = ReadDefinitionLevels(batch_size, def_levels);
}
// Exhausted column chunk
if (levels_read == 0) {
break;
}
levels_written_ += levels_read;
records_read += ReadRecordData(num_records - records_read);
} else {
// No repetition or definition levels
batch_size = std::min(num_records - records_read, batch_size);
records_read += ReadRecordData(batch_size);
}
}
return records_read;
}