in parquet/src/file/writer.rs [1119:1257]
fn test_page_roundtrip(pages: &[Page], codec: Compression, physical_type: Type) {
let mut compressed_pages = vec![];
let mut total_num_values = 0i64;
let codec_options = CodecOptionsBuilder::default()
.set_backward_compatible_lz4(false)
.build();
let mut compressor = create_codec(codec, &codec_options).unwrap();
for page in pages {
let uncompressed_len = page.buffer().len();
let compressed_page = match *page {
Page::DataPage {
ref buf,
num_values,
encoding,
def_level_encoding,
rep_level_encoding,
ref statistics,
} => {
total_num_values += num_values as i64;
let output_buf = compress_helper(compressor.as_mut(), buf.data());
Page::DataPage {
buf: ByteBufferPtr::new(output_buf),
num_values,
encoding,
def_level_encoding,
rep_level_encoding,
statistics: from_thrift(
physical_type,
to_thrift(statistics.as_ref()),
)
.unwrap(),
}
}
Page::DataPageV2 {
ref buf,
num_values,
encoding,
num_nulls,
num_rows,
def_levels_byte_len,
rep_levels_byte_len,
ref statistics,
..
} => {
total_num_values += num_values as i64;
let offset = (def_levels_byte_len + rep_levels_byte_len) as usize;
let cmp_buf =
compress_helper(compressor.as_mut(), &buf.data()[offset..]);
let mut output_buf = Vec::from(&buf.data()[..offset]);
output_buf.extend_from_slice(&cmp_buf[..]);
Page::DataPageV2 {
buf: ByteBufferPtr::new(output_buf),
num_values,
encoding,
num_nulls,
num_rows,
def_levels_byte_len,
rep_levels_byte_len,
is_compressed: compressor.is_some(),
statistics: from_thrift(
physical_type,
to_thrift(statistics.as_ref()),
)
.unwrap(),
}
}
Page::DictionaryPage {
ref buf,
num_values,
encoding,
is_sorted,
} => {
let output_buf = compress_helper(compressor.as_mut(), buf.data());
Page::DictionaryPage {
buf: ByteBufferPtr::new(output_buf),
num_values,
encoding,
is_sorted,
}
}
};
let compressed_page = CompressedPage::new(compressed_page, uncompressed_len);
compressed_pages.push(compressed_page);
}
let mut buffer: Vec<u8> = vec![];
let mut result_pages: Vec<Page> = vec![];
{
let mut writer = TrackedWrite::new(&mut buffer);
let mut page_writer = SerializedPageWriter::new(&mut writer);
for page in compressed_pages {
page_writer.write_page(page).unwrap();
}
page_writer.close().unwrap();
}
{
let reader = bytes::Bytes::from(buffer);
let t = types::Type::primitive_type_builder("t", physical_type)
.build()
.unwrap();
let desc = ColumnDescriptor::new(Arc::new(t), 0, 0, ColumnPath::new(vec![]));
let meta = ColumnChunkMetaData::builder(Arc::new(desc))
.set_compression(codec)
.set_total_compressed_size(reader.len() as i64)
.set_num_values(total_num_values)
.build()
.unwrap();
let props = ReaderProperties::builder()
.set_backward_compatible_lz4(false)
.build();
let mut page_reader = SerializedPageReader::new_with_properties(
Arc::new(reader),
&meta,
total_num_values as usize,
None,
Arc::new(props),
)
.unwrap();
while let Some(page) = page_reader.get_next_page().unwrap() {
result_pages.push(page);
}
}
assert_eq!(result_pages.len(), pages.len());
for i in 0..result_pages.len() {
assert_page(&result_pages[i], &pages[i]);
}
}