fn consume_batch()

in parquet/src/arrow/array_reader/list_array.rs [84:231]


    fn consume_batch(&mut self) -> Result<ArrayRef> {
        let next_batch_array = self.item_reader.consume_batch()?;
        if next_batch_array.is_empty() {
            return Ok(new_empty_array(&self.data_type));
        }

        let def_levels = self
            .item_reader
            .get_def_levels()
            .ok_or_else(|| general_err!("item_reader def levels are None."))?;

        let rep_levels = self
            .item_reader
            .get_rep_levels()
            .ok_or_else(|| general_err!("item_reader rep levels are None."))?;

        if OffsetSize::from_usize(next_batch_array.len()).is_none() {
            return Err(general_err!(
                "offset of {} would overflow list array",
                next_batch_array.len()
            ));
        }

        if !rep_levels.is_empty() && rep_levels[0] != 0 {
            // This implies either the source data was invalid, or the leaf column
            // reader did not correctly delimit semantic records
            return Err(general_err!("first repetition level of batch must be 0"));
        }

        // A non-nullable list has a single definition level indicating if the list is empty
        //
        // A nullable list has two definition levels associated with it:
        //
        // The first identifies if the list is null
        // The second identifies if the list is empty
        //
        // The child data returned above is padded with a value for each not-fully defined level.
        // Therefore null and empty lists will correspond to a value in the child array.
        //
        // Whilst nulls may have a non-zero slice in the offsets array, empty lists must
        // be of zero length. As a result we MUST filter out values corresponding to empty
        // lists, and for consistency we do the same for nulls.

        // The output offsets for the computed ListArray
        let mut list_offsets: Vec<OffsetSize> = Vec::with_capacity(next_batch_array.len() + 1);

        // The validity mask of the computed ListArray if nullable
        let mut validity = self
            .nullable
            .then(|| BooleanBufferBuilder::new(next_batch_array.len()));

        // The offset into the filtered child data of the current level being considered
        let mut cur_offset = 0;

        // Identifies the start of a run of values to copy from the source child data
        let mut filter_start = None;

        // The number of child values skipped due to empty lists or nulls
        let mut skipped = 0;

        // Builder used to construct the filtered child data, skipping empty lists and nulls
        let data = next_batch_array.to_data();
        let mut child_data_builder =
            MutableArrayData::new(vec![&data], false, next_batch_array.len());

        def_levels.iter().zip(rep_levels).try_for_each(|(d, r)| {
            match r.cmp(&self.rep_level) {
                Ordering::Greater => {
                    // Repetition level greater than current => already handled by inner array
                    if *d < self.def_level {
                        return Err(general_err!(
                            "Encountered repetition level too large for definition level"
                        ));
                    }
                }
                Ordering::Equal => {
                    // New value in the current list
                    cur_offset += 1;
                }
                Ordering::Less => {
                    // Create new array slice
                    // Already checked that this cannot overflow
                    list_offsets.push(OffsetSize::from_usize(cur_offset).unwrap());

                    if *d >= self.def_level {
                        // Fully defined value

                        // Record current offset if it is None
                        filter_start.get_or_insert(cur_offset + skipped);

                        cur_offset += 1;

                        if let Some(validity) = validity.as_mut() {
                            validity.append(true)
                        }
                    } else {
                        // Flush the current slice of child values if any
                        if let Some(start) = filter_start.take() {
                            child_data_builder.extend(0, start, cur_offset + skipped);
                        }

                        if let Some(validity) = validity.as_mut() {
                            // Valid if empty list
                            validity.append(*d + 1 == self.def_level)
                        }

                        skipped += 1;
                    }
                }
            }
            Ok(())
        })?;

        list_offsets.push(OffsetSize::from_usize(cur_offset).unwrap());

        let child_data = if skipped == 0 {
            // No filtered values - can reuse original array
            next_batch_array.to_data()
        } else {
            // One or more filtered values - must build new array
            if let Some(start) = filter_start.take() {
                child_data_builder.extend(0, start, cur_offset + skipped)
            }

            child_data_builder.freeze()
        };

        if cur_offset != child_data.len() {
            return Err(general_err!("Failed to reconstruct list from level data"));
        }

        let value_offsets = Buffer::from(list_offsets.to_byte_slice());

        let mut data_builder = ArrayData::builder(self.get_data_type().clone())
            .len(list_offsets.len() - 1)
            .add_buffer(value_offsets)
            .add_child_data(child_data);

        if let Some(builder) = validity {
            assert_eq!(builder.len(), list_offsets.len() - 1);
            data_builder = data_builder.null_bit_buffer(Some(builder.into()))
        }

        let list_data = unsafe { data_builder.build_unchecked() };

        let result_array = GenericListArray::<OffsetSize>::from(list_data);
        Ok(Arc::new(result_array))
    }