in parquet/src/arrow/arrow_reader/mod.rs [2522:2709]
fn single_column_reader_test<T, F, G>(
opts: TestOptions,
rand_max: i32,
converted_type: ConvertedType,
arrow_type: Option<ArrowDataType>,
converter: F,
) where
T: DataType,
G: RandGen<T>,
F: Fn(&[Option<T::T>]) -> ArrayRef,
{
// Print out options to facilitate debugging failures on CI
println!(
"Running type {:?} single_column_reader_test ConvertedType::{}/ArrowType::{:?} with Options: {:?}",
T::get_physical_type(), converted_type, arrow_type, opts
);
//according to null_percent generate def_levels
let (repetition, def_levels) = match opts.null_percent.as_ref() {
Some(null_percent) => {
let mut rng = rng();
let def_levels: Vec<Vec<i16>> = (0..opts.num_row_groups)
.map(|_| {
std::iter::from_fn(|| {
Some((rng.next_u32() as usize % 100 >= *null_percent) as i16)
})
.take(opts.num_rows)
.collect()
})
.collect();
(Repetition::OPTIONAL, Some(def_levels))
}
None => (Repetition::REQUIRED, None),
};
//generate random table data
let values: Vec<Vec<T::T>> = (0..opts.num_row_groups)
.map(|idx| {
let null_count = match def_levels.as_ref() {
Some(d) => d[idx].iter().filter(|x| **x == 0).count(),
None => 0,
};
G::gen_vec(rand_max, opts.num_rows - null_count)
})
.collect();
let len = match T::get_physical_type() {
crate::basic::Type::FIXED_LEN_BYTE_ARRAY => rand_max,
crate::basic::Type::INT96 => 12,
_ => -1,
};
let fields = vec![Arc::new(
Type::primitive_type_builder("leaf", T::get_physical_type())
.with_repetition(repetition)
.with_converted_type(converted_type)
.with_length(len)
.build()
.unwrap(),
)];
let schema = Arc::new(
Type::group_type_builder("test_schema")
.with_fields(fields)
.build()
.unwrap(),
);
let arrow_field = arrow_type.map(|t| Field::new("leaf", t, false));
let mut file = tempfile::tempfile().unwrap();
generate_single_column_file_with_data::<T>(
&values,
def_levels.as_ref(),
file.try_clone().unwrap(), // Cannot use &mut File (#1163)
schema,
arrow_field,
&opts,
)
.unwrap();
file.rewind().unwrap();
let options = ArrowReaderOptions::new()
.with_page_index(opts.enabled_statistics == EnabledStatistics::Page);
let mut builder =
ParquetRecordBatchReaderBuilder::try_new_with_options(file, options).unwrap();
let expected_data = match opts.row_selections {
Some((selections, row_count)) => {
let mut without_skip_data = gen_expected_data::<T>(def_levels.as_ref(), &values);
let mut skip_data: Vec<Option<T::T>> = vec![];
let dequeue: VecDeque<RowSelector> = selections.clone().into();
for select in dequeue {
if select.skip {
without_skip_data.drain(0..select.row_count);
} else {
skip_data.extend(without_skip_data.drain(0..select.row_count));
}
}
builder = builder.with_row_selection(selections);
assert_eq!(skip_data.len(), row_count);
skip_data
}
None => {
//get flatten table data
let expected_data = gen_expected_data::<T>(def_levels.as_ref(), &values);
assert_eq!(expected_data.len(), opts.num_rows * opts.num_row_groups);
expected_data
}
};
let mut expected_data = match opts.row_filter {
Some(filter) => {
let expected_data = expected_data
.into_iter()
.zip(filter.iter())
.filter_map(|(d, f)| f.then(|| d))
.collect();
let mut filter_offset = 0;
let filter = RowFilter::new(vec![Box::new(ArrowPredicateFn::new(
ProjectionMask::all(),
move |b| {
let array = BooleanArray::from_iter(
filter
.iter()
.skip(filter_offset)
.take(b.num_rows())
.map(|x| Some(*x)),
);
filter_offset += b.num_rows();
Ok(array)
},
))]);
builder = builder.with_row_filter(filter);
expected_data
}
None => expected_data,
};
if let Some(offset) = opts.offset {
builder = builder.with_offset(offset);
expected_data = expected_data.into_iter().skip(offset).collect();
}
if let Some(limit) = opts.limit {
builder = builder.with_limit(limit);
expected_data = expected_data.into_iter().take(limit).collect();
}
let mut record_reader = builder
.with_batch_size(opts.record_batch_size)
.build()
.unwrap();
let mut total_read = 0;
loop {
let maybe_batch = record_reader.next();
if total_read < expected_data.len() {
let end = min(total_read + opts.record_batch_size, expected_data.len());
let batch = maybe_batch.unwrap().unwrap();
assert_eq!(end - total_read, batch.num_rows());
let a = converter(&expected_data[total_read..end]);
let b = Arc::clone(batch.column(0));
assert_eq!(a.data_type(), b.data_type());
assert_eq!(a.to_data(), b.to_data());
assert_eq!(
a.as_any().type_id(),
b.as_any().type_id(),
"incorrect type ids"
);
total_read = end;
} else {
assert!(maybe_batch.is_none());
break;
}
}
}