in src/parquet/column_writer.cc [418:484]
void ColumnWriter::AddDataPage() {
int64_t definition_levels_rle_size = 0;
int64_t repetition_levels_rle_size = 0;
std::shared_ptr<Buffer> values = GetValuesBuffer();
if (descr_->max_definition_level() > 0) {
definition_levels_rle_size =
RleEncodeLevels(definition_levels_sink_->GetBufferRef(),
definition_levels_rle_.get(), descr_->max_definition_level());
}
if (descr_->max_repetition_level() > 0) {
repetition_levels_rle_size =
RleEncodeLevels(repetition_levels_sink_->GetBufferRef(),
repetition_levels_rle_.get(), descr_->max_repetition_level());
}
int64_t uncompressed_size =
definition_levels_rle_size + repetition_levels_rle_size + values->size();
// Use Arrow::Buffer::shrink_to_fit = false
// underlying buffer only keeps growing. Resize to a smaller size does not reallocate.
PARQUET_THROW_NOT_OK(uncompressed_data_->Resize(uncompressed_size, false));
// Concatenate data into a single buffer
uint8_t* uncompressed_ptr = uncompressed_data_->mutable_data();
memcpy(uncompressed_ptr, repetition_levels_rle_->data(), repetition_levels_rle_size);
uncompressed_ptr += repetition_levels_rle_size;
memcpy(uncompressed_ptr, definition_levels_rle_->data(), definition_levels_rle_size);
uncompressed_ptr += definition_levels_rle_size;
memcpy(uncompressed_ptr, values->data(), values->size());
EncodedStatistics page_stats = GetPageStatistics();
ResetPageStatistics();
std::shared_ptr<Buffer> compressed_data;
if (pager_->has_compressor()) {
pager_->Compress(*(uncompressed_data_.get()), compressed_data_.get());
compressed_data = compressed_data_;
} else {
compressed_data = uncompressed_data_;
}
// Write the page to OutputStream eagerly if there is no dictionary or
// if dictionary encoding has fallen back to PLAIN
if (has_dictionary_ && !fallback_) { // Save pages until end of dictionary encoding
std::shared_ptr<Buffer> compressed_data_copy;
PARQUET_THROW_NOT_OK(compressed_data->Copy(0, compressed_data->size(), allocator_,
&compressed_data_copy));
CompressedDataPage page(compressed_data_copy,
static_cast<int32_t>(num_buffered_values_), encoding_,
Encoding::RLE, Encoding::RLE, uncompressed_size, page_stats);
total_compressed_bytes_ += page.size() + sizeof(format::PageHeader);
data_pages_.push_back(std::move(page));
} else { // Eagerly write pages
CompressedDataPage page(compressed_data, static_cast<int32_t>(num_buffered_values_),
encoding_, Encoding::RLE, Encoding::RLE, uncompressed_size,
page_stats);
WriteDataPage(page);
}
// Re-initialize the sinks for next Page.
InitSinks();
num_buffered_values_ = 0;
num_buffered_encoded_values_ = 0;
}