fn single_column_reader_test()

in parquet/src/arrow/arrow_reader/mod.rs [2522:2709]


    fn single_column_reader_test<T, F, G>(
        opts: TestOptions,
        rand_max: i32,
        converted_type: ConvertedType,
        arrow_type: Option<ArrowDataType>,
        converter: F,
    ) where
        T: DataType,
        G: RandGen<T>,
        F: Fn(&[Option<T::T>]) -> ArrayRef,
    {
        // Print out options to facilitate debugging failures on CI
        println!(
            "Running type {:?} single_column_reader_test ConvertedType::{}/ArrowType::{:?} with Options: {:?}",
            T::get_physical_type(), converted_type, arrow_type, opts
        );

        //according to null_percent generate def_levels
        let (repetition, def_levels) = match opts.null_percent.as_ref() {
            Some(null_percent) => {
                let mut rng = rng();

                let def_levels: Vec<Vec<i16>> = (0..opts.num_row_groups)
                    .map(|_| {
                        std::iter::from_fn(|| {
                            Some((rng.next_u32() as usize % 100 >= *null_percent) as i16)
                        })
                        .take(opts.num_rows)
                        .collect()
                    })
                    .collect();
                (Repetition::OPTIONAL, Some(def_levels))
            }
            None => (Repetition::REQUIRED, None),
        };

        //generate random table data
        let values: Vec<Vec<T::T>> = (0..opts.num_row_groups)
            .map(|idx| {
                let null_count = match def_levels.as_ref() {
                    Some(d) => d[idx].iter().filter(|x| **x == 0).count(),
                    None => 0,
                };
                G::gen_vec(rand_max, opts.num_rows - null_count)
            })
            .collect();

        let len = match T::get_physical_type() {
            crate::basic::Type::FIXED_LEN_BYTE_ARRAY => rand_max,
            crate::basic::Type::INT96 => 12,
            _ => -1,
        };

        let fields = vec![Arc::new(
            Type::primitive_type_builder("leaf", T::get_physical_type())
                .with_repetition(repetition)
                .with_converted_type(converted_type)
                .with_length(len)
                .build()
                .unwrap(),
        )];

        let schema = Arc::new(
            Type::group_type_builder("test_schema")
                .with_fields(fields)
                .build()
                .unwrap(),
        );

        let arrow_field = arrow_type.map(|t| Field::new("leaf", t, false));

        let mut file = tempfile::tempfile().unwrap();

        generate_single_column_file_with_data::<T>(
            &values,
            def_levels.as_ref(),
            file.try_clone().unwrap(), // Cannot use &mut File (#1163)
            schema,
            arrow_field,
            &opts,
        )
        .unwrap();

        file.rewind().unwrap();

        let options = ArrowReaderOptions::new()
            .with_page_index(opts.enabled_statistics == EnabledStatistics::Page);

        let mut builder =
            ParquetRecordBatchReaderBuilder::try_new_with_options(file, options).unwrap();

        let expected_data = match opts.row_selections {
            Some((selections, row_count)) => {
                let mut without_skip_data = gen_expected_data::<T>(def_levels.as_ref(), &values);

                let mut skip_data: Vec<Option<T::T>> = vec![];
                let dequeue: VecDeque<RowSelector> = selections.clone().into();
                for select in dequeue {
                    if select.skip {
                        without_skip_data.drain(0..select.row_count);
                    } else {
                        skip_data.extend(without_skip_data.drain(0..select.row_count));
                    }
                }
                builder = builder.with_row_selection(selections);

                assert_eq!(skip_data.len(), row_count);
                skip_data
            }
            None => {
                //get flatten table data
                let expected_data = gen_expected_data::<T>(def_levels.as_ref(), &values);
                assert_eq!(expected_data.len(), opts.num_rows * opts.num_row_groups);
                expected_data
            }
        };

        let mut expected_data = match opts.row_filter {
            Some(filter) => {
                let expected_data = expected_data
                    .into_iter()
                    .zip(filter.iter())
                    .filter_map(|(d, f)| f.then(|| d))
                    .collect();

                let mut filter_offset = 0;
                let filter = RowFilter::new(vec![Box::new(ArrowPredicateFn::new(
                    ProjectionMask::all(),
                    move |b| {
                        let array = BooleanArray::from_iter(
                            filter
                                .iter()
                                .skip(filter_offset)
                                .take(b.num_rows())
                                .map(|x| Some(*x)),
                        );
                        filter_offset += b.num_rows();
                        Ok(array)
                    },
                ))]);

                builder = builder.with_row_filter(filter);
                expected_data
            }
            None => expected_data,
        };

        if let Some(offset) = opts.offset {
            builder = builder.with_offset(offset);
            expected_data = expected_data.into_iter().skip(offset).collect();
        }

        if let Some(limit) = opts.limit {
            builder = builder.with_limit(limit);
            expected_data = expected_data.into_iter().take(limit).collect();
        }

        let mut record_reader = builder
            .with_batch_size(opts.record_batch_size)
            .build()
            .unwrap();

        let mut total_read = 0;
        loop {
            let maybe_batch = record_reader.next();
            if total_read < expected_data.len() {
                let end = min(total_read + opts.record_batch_size, expected_data.len());
                let batch = maybe_batch.unwrap().unwrap();
                assert_eq!(end - total_read, batch.num_rows());

                let a = converter(&expected_data[total_read..end]);
                let b = Arc::clone(batch.column(0));

                assert_eq!(a.data_type(), b.data_type());
                assert_eq!(a.to_data(), b.to_data());
                assert_eq!(
                    a.as_any().type_id(),
                    b.as_any().type_id(),
                    "incorrect type ids"
                );

                total_read = end;
            } else {
                assert!(maybe_batch.is_none());
                break;
            }
        }
    }