in src/parquet/file_reader.cc [322:374]
int64_t ScanFileContents(std::vector<int> columns, const int32_t column_batch_size,
ParquetFileReader* reader) {
std::vector<int16_t> rep_levels(column_batch_size);
std::vector<int16_t> def_levels(column_batch_size);
int num_columns = static_cast<int>(columns.size());
// columns are not specified explicitly. Add all columns
if (columns.size() == 0) {
num_columns = reader->metadata()->num_columns();
columns.resize(num_columns);
for (int i = 0; i < num_columns; i++) {
columns[i] = i;
}
}
std::vector<int64_t> total_rows(num_columns, 0);
for (int r = 0; r < reader->metadata()->num_row_groups(); ++r) {
auto group_reader = reader->RowGroup(r);
int col = 0;
for (auto i : columns) {
std::shared_ptr<ColumnReader> col_reader = group_reader->Column(i);
size_t value_byte_size = GetTypeByteSize(col_reader->descr()->physical_type());
std::vector<uint8_t> values(column_batch_size * value_byte_size);
int64_t values_read = 0;
while (col_reader->HasNext()) {
int64_t levels_read =
ScanAllValues(column_batch_size, def_levels.data(), rep_levels.data(),
values.data(), &values_read, col_reader.get());
if (col_reader->descr()->max_repetition_level() > 0) {
for (int64_t i = 0; i < levels_read; i++) {
if (rep_levels[i] == 0) {
total_rows[col]++;
}
}
} else {
total_rows[col] += levels_read;
}
}
col++;
}
}
for (int i = 1; i < num_columns; ++i) {
if (total_rows[0] != total_rows[i]) {
throw ParquetException("Parquet error: Total rows among columns do not match");
}
}
return total_rows[0];
}