fn build_nested_list_array()

in crates/core/src/avro_to_arrow/arrow_array_reader.rs [425:592]


    fn build_nested_list_array<OffsetSize: OffsetSizeTrait>(
        &self,
        parent_field_name: &str,
        rows: &[&Value],
        list_field: &Field,
    ) -> ArrowResult<ArrayRef> {
        // build list offsets
        let mut cur_offset = OffsetSize::zero();
        let list_len = rows.len();
        let num_list_bytes = bit_util::ceil(list_len, 8);
        let mut offsets = Vec::with_capacity(list_len + 1);
        let mut list_nulls = MutableBuffer::from_len_zeroed(num_list_bytes);
        offsets.push(cur_offset);
        rows.iter().enumerate().for_each(|(i, v)| {
            // TODO: unboxing Union(Array(Union(...))) should probably be done earlier
            let v = maybe_resolve_union(v);
            if let Value::Array(a) = v {
                cur_offset += OffsetSize::from_usize(a.len()).unwrap();
                bit_util::set_bit(&mut list_nulls, i);
            } else if let Value::Null = v {
                // value is null, not incremented
            } else {
                cur_offset += OffsetSize::one();
            }
            offsets.push(cur_offset);
        });
        let valid_len = cur_offset.to_usize().unwrap();
        let array_data = match list_field.data_type() {
            DataType::Null => NullArray::new(valid_len).into_data(),
            DataType::Boolean => {
                let num_bytes = bit_util::ceil(valid_len, 8);
                let mut bool_values = MutableBuffer::from_len_zeroed(num_bytes);
                let mut bool_nulls = MutableBuffer::new(num_bytes).with_bitset(num_bytes, true);
                let mut curr_index = 0;
                rows.iter().for_each(|v| {
                    if let Value::Array(vs) = v {
                        vs.iter().for_each(|value| {
                            if let Value::Boolean(child) = value {
                                // if valid boolean, append value
                                if *child {
                                    bit_util::set_bit(&mut bool_values, curr_index);
                                }
                            } else {
                                // null slot
                                bit_util::unset_bit(&mut bool_nulls, curr_index);
                            }
                            curr_index += 1;
                        });
                    }
                });
                ArrayData::builder(list_field.data_type().clone())
                    .len(valid_len)
                    .add_buffer(bool_values.into())
                    .null_bit_buffer(Some(bool_nulls.into()))
                    .build()
                    .unwrap()
            }
            DataType::Int8 => self.read_primitive_list_values::<Int8Type>(rows),
            DataType::Int16 => self.read_primitive_list_values::<Int16Type>(rows),
            DataType::Int32 => self.read_primitive_list_values::<Int32Type>(rows),
            DataType::Int64 => self.read_primitive_list_values::<Int64Type>(rows),
            DataType::UInt8 => self.read_primitive_list_values::<UInt8Type>(rows),
            DataType::UInt16 => self.read_primitive_list_values::<UInt16Type>(rows),
            DataType::UInt32 => self.read_primitive_list_values::<UInt32Type>(rows),
            DataType::UInt64 => self.read_primitive_list_values::<UInt64Type>(rows),
            DataType::Float16 => return Err(SchemaError("Float16 not supported".to_string())),
            DataType::Float32 => self.read_primitive_list_values::<Float32Type>(rows),
            DataType::Float64 => self.read_primitive_list_values::<Float64Type>(rows),
            DataType::Timestamp(_, _)
            | DataType::Date32
            | DataType::Date64
            | DataType::Time32(_)
            | DataType::Time64(_) => {
                return Err(SchemaError(
                    "Temporal types are not yet supported, see ARROW-4803".to_string(),
                ))
            }
            DataType::Utf8 => flatten_string_values(rows)
                .into_iter()
                .collect::<StringArray>()
                .into_data(),
            DataType::LargeUtf8 => flatten_string_values(rows)
                .into_iter()
                .collect::<LargeStringArray>()
                .into_data(),
            DataType::List(field) => {
                let child = self.build_nested_list_array::<i32>(
                    parent_field_name,
                    &flatten_values(rows),
                    field,
                )?;
                child.to_data()
            }
            DataType::LargeList(field) => {
                let child = self.build_nested_list_array::<i64>(
                    parent_field_name,
                    &flatten_values(rows),
                    field,
                )?;
                child.to_data()
            }
            DataType::Struct(fields) => {
                // extract list values, with non-lists converted to Value::Null
                let array_item_count = rows
                    .iter()
                    .map(|row| match maybe_resolve_union(row) {
                        Value::Array(values) => values.len(),
                        _ => 1,
                    })
                    .sum();
                let num_bytes = bit_util::ceil(array_item_count, 8);
                let mut null_buffer = MutableBuffer::from_len_zeroed(num_bytes);
                let mut struct_index = 0;
                let null_struct_array = vec![("null".to_string(), Value::Null)];
                let rows: Vec<&Vec<(String, Value)>> = rows
                    .iter()
                    .map(|v| maybe_resolve_union(v))
                    .flat_map(|row| {
                        if let Value::Array(values) = row {
                            values
                                .iter()
                                .map(maybe_resolve_union)
                                .map(|v| match v {
                                    Value::Record(record) => {
                                        bit_util::set_bit(&mut null_buffer, struct_index);
                                        struct_index += 1;
                                        record
                                    }
                                    Value::Null => {
                                        struct_index += 1;
                                        &null_struct_array
                                    }
                                    other => panic!("expected Record, got {other:?}"),
                                })
                                .collect::<Vec<&Vec<(String, Value)>>>()
                        } else {
                            struct_index += 1;
                            vec![&null_struct_array]
                        }
                    })
                    .collect();

                let sub_parent_field_name = format!("{}.{}", parent_field_name, list_field.name());
                let arrays = self.build_struct_array(&rows, &sub_parent_field_name, fields, &[])?;
                let data_type = DataType::Struct(fields.clone());
                ArrayDataBuilder::new(data_type)
                    .len(rows.len())
                    .null_bit_buffer(Some(null_buffer.into()))
                    .child_data(arrays.into_iter().map(|a| a.to_data()).collect())
                    .build()
                    .unwrap()
            }
            datatype => {
                return Err(SchemaError(format!(
                    "Nested list of {datatype:?} not supported"
                )));
            }
        };
        // build list
        let list_data = ArrayData::builder(DataType::List(Arc::new(list_field.clone())))
            .len(list_len)
            .add_buffer(Buffer::from_slice_ref(&offsets))
            .add_child_data(array_data)
            .null_bit_buffer(Some(list_nulls.into()))
            .build()
            .unwrap();
        Ok(Arc::new(GenericListArray::<OffsetSize>::from(list_data)))
    }