in parquet/src/arrow/arrow_reader/mod.rs [4385:4496]
fn test_list_selection_fuzz() {
let mut rng = rng();
let schema = Arc::new(Schema::new(vec![Field::new_list(
"list",
Field::new_list(
Field::LIST_FIELD_DEFAULT_NAME,
Field::new_list_field(ArrowDataType::Int32, true),
true,
),
true,
)]));
let mut buf = Vec::with_capacity(1024);
let mut writer = ArrowWriter::try_new(&mut buf, schema.clone(), None).unwrap();
let mut list_a_builder = ListBuilder::new(ListBuilder::new(Int32Builder::new()));
for _ in 0..2048 {
if rng.random_bool(0.2) {
list_a_builder.append(false);
continue;
}
let list_a_len = rng.random_range(0..10);
let list_b_builder = list_a_builder.values();
for _ in 0..list_a_len {
if rng.random_bool(0.2) {
list_b_builder.append(false);
continue;
}
let list_b_len = rng.random_range(0..10);
let int_builder = list_b_builder.values();
for _ in 0..list_b_len {
match rng.random_bool(0.2) {
true => int_builder.append_null(),
false => int_builder.append_value(rng.random()),
}
}
list_b_builder.append(true)
}
list_a_builder.append(true);
}
let array = Arc::new(list_a_builder.finish());
let batch = RecordBatch::try_new(schema, vec![array]).unwrap();
writer.write(&batch).unwrap();
let _metadata = writer.close().unwrap();
let buf = Bytes::from(buf);
let cases = [
vec![
RowSelector::skip(100),
RowSelector::select(924),
RowSelector::skip(100),
RowSelector::select(924),
],
vec![
RowSelector::select(924),
RowSelector::skip(100),
RowSelector::select(924),
RowSelector::skip(100),
],
vec![
RowSelector::skip(1023),
RowSelector::select(1),
RowSelector::skip(1023),
RowSelector::select(1),
],
vec![
RowSelector::select(1),
RowSelector::skip(1023),
RowSelector::select(1),
RowSelector::skip(1023),
],
];
for batch_size in [100, 1024, 2048] {
for selection in &cases {
let selection = RowSelection::from(selection.clone());
let reader = ParquetRecordBatchReaderBuilder::try_new(buf.clone())
.unwrap()
.with_row_selection(selection.clone())
.with_batch_size(batch_size)
.build()
.unwrap();
let batches = reader.collect::<Result<Vec<_>, _>>().unwrap();
let actual = concat_batches(batch.schema_ref(), &batches).unwrap();
assert_eq!(actual.num_rows(), selection.row_count());
let mut batch_offset = 0;
let mut actual_offset = 0;
for selector in selection.iter() {
if selector.skip {
batch_offset += selector.row_count;
continue;
}
assert_eq!(
batch.slice(batch_offset, selector.row_count),
actual.slice(actual_offset, selector.row_count)
);
batch_offset += selector.row_count;
actual_offset += selector.row_count;
}
}
}
}