fn consume_batch()

in parquet/src/arrow/array_reader/fixed_size_list_array.rs [78:209]


    fn consume_batch(&mut self) -> Result<ArrayRef> {
        let next_batch_array = self.item_reader.consume_batch()?;
        if next_batch_array.len() == 0 {
            return Ok(new_empty_array(&self.data_type));
        }

        let def_levels = self
            .get_def_levels()
            .ok_or_else(|| general_err!("item_reader def levels are None"))?;
        let rep_levels = self
            .get_rep_levels()
            .ok_or_else(|| general_err!("item_reader rep levels are None"))?;

        if !rep_levels.is_empty() && rep_levels[0] != 0 {
            // This implies either the source data was invalid, or the leaf column
            // reader did not correctly delimit semantic records
            return Err(general_err!("first repetition level of batch must be 0"));
        }

        let mut validity = self
            .nullable
            .then(|| BooleanBufferBuilder::new(next_batch_array.len()));

        let data = next_batch_array.to_data();
        let mut child_data_builder =
            MutableArrayData::new(vec![&data], true, next_batch_array.len());

        // The current index into the child array entries
        let mut child_idx = 0;
        // The total number of rows (valid and invalid) in the list array
        let mut list_len = 0;
        // Start of the current run of valid values
        let mut start_idx = None;
        let mut row_len = 0;

        def_levels.iter().zip(rep_levels).try_for_each(|(d, r)| {
            match r.cmp(&self.rep_level) {
                Ordering::Greater => {
                    // Repetition level greater than current => already handled by inner array
                    if *d < self.def_level {
                        return Err(general_err!(
                            "Encountered repetition level too large for definition level"
                        ));
                    }
                }
                Ordering::Equal => {
                    // Item inside of the current list
                    child_idx += 1;
                    row_len += 1;
                }
                Ordering::Less => {
                    // Start of new list row
                    list_len += 1;

                    // Length of the previous row should be equal to:
                    // - the list's fixed size (valid entries)
                    // - zero (null entries, start of array)
                    // Any other length indicates invalid data
                    if start_idx.is_some() && row_len != self.fixed_size {
                        return Err(general_err!(
                            "Encountered misaligned row with length {} (expected length {})",
                            row_len,
                            self.fixed_size
                        ))
                    }
                    row_len = 0;

                    if *d >= self.def_level {
                        row_len += 1;

                        // Valid list entry
                        if let Some(validity) = validity.as_mut() {
                            validity.append(true);
                        }
                        // Start a run of valid rows if not already inside of one
                        start_idx.get_or_insert(child_idx);
                    } else {
                        // Null list entry

                        if let Some(start) = start_idx.take() {
                            // Flush pending child items
                            child_data_builder.extend(0, start, child_idx);
                        }
                        // Pad list with nulls
                        child_data_builder.extend_nulls(self.fixed_size);

                        if let Some(validity) = validity.as_mut() {
                            // Valid if empty list
                            validity.append(*d + 1 == self.def_level);
                        }
                    }
                    child_idx += 1;
                }
            }
            Ok(())
        })?;

        let child_data = match start_idx {
            Some(0) => {
                // No null entries - can reuse original array
                next_batch_array.to_data()
            }
            Some(start) => {
                // Flush pending child items
                child_data_builder.extend(0, start, child_idx);
                child_data_builder.freeze()
            }
            None => child_data_builder.freeze(),
        };

        // Verify total number of elements is aligned with fixed list size
        if list_len * self.fixed_size != child_data.len() {
            return Err(general_err!(
                "fixed-size list length must be a multiple of {} but array contains {} elements",
                self.fixed_size,
                child_data.len()
            ));
        }

        let mut list_builder = ArrayData::builder(self.get_data_type().clone())
            .len(list_len)
            .add_child_data(child_data);

        if let Some(builder) = validity {
            list_builder = list_builder.null_bit_buffer(Some(builder.into()));
        }

        let list_data = unsafe { list_builder.build_unchecked() };

        let result_array = FixedSizeListArray::from(list_data);
        Ok(Arc::new(result_array))
    }