int64_t ReadRecords()

in src/parquet/arrow/record_reader.cc [440:507]


  int64_t ReadRecords(int64_t num_records) override {
    // Delimit records, then read values at the end
    int64_t records_read = 0;

    if (levels_position_ < levels_written_) {
      records_read += ReadRecordData(num_records);
    }

    int64_t level_batch_size = std::max(kMinLevelBatchSize, num_records);

    // If we are in the middle of a record, we continue until reaching the
    // desired number of records or the end of the current record if we've found
    // enough records
    while (!at_record_start_ || records_read < num_records) {
      // Is there more data to read in this row group?
      if (!HasNext()) {
        if (!at_record_start_) {
          // We ended the row group while inside a record that we haven't seen
          // the end of yet. So increment the record count for the last record in
          // the row group
          ++records_read;
          at_record_start_ = true;
        }
        break;
      }

      /// We perform multiple batch reads until we either exhaust the row group
      /// or observe the desired number of records
      int64_t batch_size = std::min(level_batch_size, available_values_current_page());

      // No more data in column
      if (batch_size == 0) {
        break;
      }

      if (max_def_level_ > 0) {
        ReserveLevels(batch_size);

        int16_t* def_levels = this->def_levels() + levels_written_;
        int16_t* rep_levels = this->rep_levels() + levels_written_;

        // Not present for non-repeated fields
        int64_t levels_read = 0;
        if (max_rep_level_ > 0) {
          levels_read = ReadDefinitionLevels(batch_size, def_levels);
          if (ReadRepetitionLevels(batch_size, rep_levels) != levels_read) {
            throw ParquetException("Number of decoded rep / def levels did not match");
          }
        } else if (max_def_level_ > 0) {
          levels_read = ReadDefinitionLevels(batch_size, def_levels);
        }

        // Exhausted column chunk
        if (levels_read == 0) {
          break;
        }

        levels_written_ += levels_read;
        records_read += ReadRecordData(num_records - records_read);
      } else {
        // No repetition or definition levels
        batch_size = std::min(num_records - records_read, batch_size);
        records_read += ReadRecordData(batch_size);
      }
    }

    return records_read;
  }