in src/parquet/file-serialize-test.cc [56:141]
void FileSerializeTest(Compression::type codec_type) {
std::shared_ptr<InMemoryOutputStream> sink(new InMemoryOutputStream());
auto gnode = std::static_pointer_cast<GroupNode>(this->node_);
WriterProperties::Builder prop_builder;
for (int i = 0; i < num_columns_; ++i) {
prop_builder.compression(this->schema_.Column(i)->name(), codec_type);
}
std::shared_ptr<WriterProperties> writer_properties = prop_builder.build();
auto file_writer = ParquetFileWriter::Open(sink, gnode, writer_properties);
this->GenerateData(rows_per_rowgroup_);
for (int rg = 0; rg < num_rowgroups_ / 2; ++rg) {
RowGroupWriter* row_group_writer;
row_group_writer = file_writer->AppendRowGroup();
for (int col = 0; col < num_columns_; ++col) {
auto column_writer =
static_cast<TypedColumnWriter<TestType>*>(row_group_writer->NextColumn());
column_writer->WriteBatch(rows_per_rowgroup_, this->def_levels_.data(), nullptr,
this->values_ptr_);
column_writer->Close();
// Ensure column() API which is specific to BufferedRowGroup cannot be called
ASSERT_THROW(row_group_writer->column(col), ParquetException);
}
row_group_writer->Close();
}
// Write half BufferedRowGroups
for (int rg = 0; rg < num_rowgroups_ / 2; ++rg) {
RowGroupWriter* row_group_writer;
row_group_writer = file_writer->AppendBufferedRowGroup();
for (int batch = 0; batch < (rows_per_rowgroup_ / rows_per_batch_); ++batch) {
for (int col = 0; col < num_columns_; ++col) {
auto column_writer =
static_cast<TypedColumnWriter<TestType>*>(row_group_writer->column(col));
column_writer->WriteBatch(
rows_per_batch_, this->def_levels_.data() + (batch * rows_per_batch_),
nullptr, this->values_ptr_ + (batch * rows_per_batch_));
// Ensure NextColumn() API which is specific to RowGroup cannot be called
ASSERT_THROW(row_group_writer->NextColumn(), ParquetException);
}
}
for (int col = 0; col < num_columns_; ++col) {
auto column_writer =
static_cast<TypedColumnWriter<TestType>*>(row_group_writer->column(col));
column_writer->Close();
}
row_group_writer->Close();
}
file_writer->Close();
auto buffer = sink->GetBuffer();
int num_rows_ = num_rowgroups_ * rows_per_rowgroup_;
auto source = std::make_shared<::arrow::io::BufferReader>(buffer);
auto file_reader = ParquetFileReader::Open(source);
ASSERT_EQ(num_columns_, file_reader->metadata()->num_columns());
ASSERT_EQ(num_rowgroups_, file_reader->metadata()->num_row_groups());
ASSERT_EQ(num_rows_, file_reader->metadata()->num_rows());
for (int rg = 0; rg < num_rowgroups_; ++rg) {
auto rg_reader = file_reader->RowGroup(rg);
ASSERT_EQ(num_columns_, rg_reader->metadata()->num_columns());
ASSERT_EQ(rows_per_rowgroup_, rg_reader->metadata()->num_rows());
// Check that the specified compression was actually used.
ASSERT_EQ(codec_type, rg_reader->metadata()->ColumnChunk(0)->compression());
int64_t values_read;
for (int i = 0; i < num_columns_; ++i) {
ASSERT_FALSE(rg_reader->metadata()->ColumnChunk(i)->has_index_page());
std::vector<int16_t> def_levels_out(rows_per_rowgroup_);
std::vector<int16_t> rep_levels_out(rows_per_rowgroup_);
auto col_reader =
std::static_pointer_cast<TypedColumnReader<TestType>>(rg_reader->Column(i));
this->SetupValuesOut(rows_per_rowgroup_);
col_reader->ReadBatch(rows_per_rowgroup_, def_levels_out.data(),
rep_levels_out.data(), this->values_out_ptr_, &values_read);
this->SyncValuesOut();
ASSERT_EQ(rows_per_rowgroup_, values_read);
ASSERT_EQ(this->values_, this->values_out_);
ASSERT_EQ(this->def_levels_, def_levels_out);
}
}
}