void FileSerializeTest()

in src/parquet/file-serialize-test.cc [56:141]


  void FileSerializeTest(Compression::type codec_type) {
    std::shared_ptr<InMemoryOutputStream> sink(new InMemoryOutputStream());
    auto gnode = std::static_pointer_cast<GroupNode>(this->node_);

    WriterProperties::Builder prop_builder;

    for (int i = 0; i < num_columns_; ++i) {
      prop_builder.compression(this->schema_.Column(i)->name(), codec_type);
    }
    std::shared_ptr<WriterProperties> writer_properties = prop_builder.build();

    auto file_writer = ParquetFileWriter::Open(sink, gnode, writer_properties);
    this->GenerateData(rows_per_rowgroup_);
    for (int rg = 0; rg < num_rowgroups_ / 2; ++rg) {
      RowGroupWriter* row_group_writer;
      row_group_writer = file_writer->AppendRowGroup();
      for (int col = 0; col < num_columns_; ++col) {
        auto column_writer =
            static_cast<TypedColumnWriter<TestType>*>(row_group_writer->NextColumn());
        column_writer->WriteBatch(rows_per_rowgroup_, this->def_levels_.data(), nullptr,
                                  this->values_ptr_);
        column_writer->Close();
        // Ensure column() API which is specific to BufferedRowGroup cannot be called
        ASSERT_THROW(row_group_writer->column(col), ParquetException);
      }

      row_group_writer->Close();
    }
    // Write half BufferedRowGroups
    for (int rg = 0; rg < num_rowgroups_ / 2; ++rg) {
      RowGroupWriter* row_group_writer;
      row_group_writer = file_writer->AppendBufferedRowGroup();
      for (int batch = 0; batch < (rows_per_rowgroup_ / rows_per_batch_); ++batch) {
        for (int col = 0; col < num_columns_; ++col) {
          auto column_writer =
              static_cast<TypedColumnWriter<TestType>*>(row_group_writer->column(col));
          column_writer->WriteBatch(
              rows_per_batch_, this->def_levels_.data() + (batch * rows_per_batch_),
              nullptr, this->values_ptr_ + (batch * rows_per_batch_));
          // Ensure NextColumn() API which is specific to RowGroup cannot be called
          ASSERT_THROW(row_group_writer->NextColumn(), ParquetException);
        }
      }
      for (int col = 0; col < num_columns_; ++col) {
        auto column_writer =
            static_cast<TypedColumnWriter<TestType>*>(row_group_writer->column(col));
        column_writer->Close();
      }
      row_group_writer->Close();
    }
    file_writer->Close();

    auto buffer = sink->GetBuffer();
    int num_rows_ = num_rowgroups_ * rows_per_rowgroup_;

    auto source = std::make_shared<::arrow::io::BufferReader>(buffer);
    auto file_reader = ParquetFileReader::Open(source);
    ASSERT_EQ(num_columns_, file_reader->metadata()->num_columns());
    ASSERT_EQ(num_rowgroups_, file_reader->metadata()->num_row_groups());
    ASSERT_EQ(num_rows_, file_reader->metadata()->num_rows());

    for (int rg = 0; rg < num_rowgroups_; ++rg) {
      auto rg_reader = file_reader->RowGroup(rg);
      ASSERT_EQ(num_columns_, rg_reader->metadata()->num_columns());
      ASSERT_EQ(rows_per_rowgroup_, rg_reader->metadata()->num_rows());
      // Check that the specified compression was actually used.
      ASSERT_EQ(codec_type, rg_reader->metadata()->ColumnChunk(0)->compression());

      int64_t values_read;

      for (int i = 0; i < num_columns_; ++i) {
        ASSERT_FALSE(rg_reader->metadata()->ColumnChunk(i)->has_index_page());
        std::vector<int16_t> def_levels_out(rows_per_rowgroup_);
        std::vector<int16_t> rep_levels_out(rows_per_rowgroup_);
        auto col_reader =
            std::static_pointer_cast<TypedColumnReader<TestType>>(rg_reader->Column(i));
        this->SetupValuesOut(rows_per_rowgroup_);
        col_reader->ReadBatch(rows_per_rowgroup_, def_levels_out.data(),
                              rep_levels_out.data(), this->values_out_ptr_, &values_read);
        this->SyncValuesOut();
        ASSERT_EQ(rows_per_rowgroup_, values_read);
        ASSERT_EQ(this->values_, this->values_out_);
        ASSERT_EQ(this->def_levels_, def_levels_out);
      }
    }
  }