fn build_struct_array()

in crates/core/src/avro_to_arrow/arrow_array_reader.rs [602:773]


    fn build_struct_array(
        &self,
        rows: RecordSlice,
        parent_field_name: &str,
        struct_fields: &Fields,
        projection: &[String],
    ) -> ArrowResult<Vec<ArrayRef>> {
        let arrays: ArrowResult<Vec<ArrayRef>> = struct_fields
            .iter()
            .filter(|field| projection.is_empty() || projection.contains(field.name()))
            .map(|field| {
                let field_path = if parent_field_name.is_empty() {
                    field.name().to_string()
                } else {
                    format!("{}.{}", parent_field_name, field.name())
                };
                let arr = match field.data_type() {
                    DataType::Null => Arc::new(NullArray::new(rows.len())) as ArrayRef,
                    DataType::Boolean => self.build_boolean_array(rows, &field_path),
                    DataType::Float64 => {
                        self.build_primitive_array::<Float64Type>(rows, &field_path)
                    }
                    DataType::Float32 => {
                        self.build_primitive_array::<Float32Type>(rows, &field_path)
                    }
                    DataType::Int64 => self.build_primitive_array::<Int64Type>(rows, &field_path),
                    DataType::Int32 => self.build_primitive_array::<Int32Type>(rows, &field_path),
                    DataType::Int16 => self.build_primitive_array::<Int16Type>(rows, &field_path),
                    DataType::Int8 => self.build_primitive_array::<Int8Type>(rows, &field_path),
                    DataType::UInt64 => self.build_primitive_array::<UInt64Type>(rows, &field_path),
                    DataType::UInt32 => self.build_primitive_array::<UInt32Type>(rows, &field_path),
                    DataType::UInt16 => self.build_primitive_array::<UInt16Type>(rows, &field_path),
                    DataType::UInt8 => self.build_primitive_array::<UInt8Type>(rows, &field_path),
                    // TODO: this is incomplete
                    DataType::Timestamp(unit, _) => match unit {
                        TimeUnit::Second => {
                            self.build_primitive_array::<TimestampSecondType>(rows, &field_path)
                        }
                        TimeUnit::Microsecond => self
                            .build_primitive_array::<TimestampMicrosecondType>(rows, &field_path),
                        TimeUnit::Millisecond => self
                            .build_primitive_array::<TimestampMillisecondType>(rows, &field_path),
                        TimeUnit::Nanosecond => {
                            self.build_primitive_array::<TimestampNanosecondType>(rows, &field_path)
                        }
                    },
                    DataType::Date64 => self.build_primitive_array::<Date64Type>(rows, &field_path),
                    DataType::Date32 => self.build_primitive_array::<Date32Type>(rows, &field_path),
                    DataType::Time64(unit) => {
                        match unit {
                            TimeUnit::Microsecond => self
                                .build_primitive_array::<Time64MicrosecondType>(rows, &field_path),
                            TimeUnit::Nanosecond => self
                                .build_primitive_array::<Time64NanosecondType>(rows, &field_path),
                            t => {
                                return Err(SchemaError(format!(
                                    "TimeUnit {t:?} not supported with Time64"
                                )))
                            }
                        }
                    }
                    DataType::Time32(unit) => match unit {
                        TimeUnit::Second => {
                            self.build_primitive_array::<Time32SecondType>(rows, &field_path)
                        }
                        TimeUnit::Millisecond => {
                            self.build_primitive_array::<Time32MillisecondType>(rows, &field_path)
                        }
                        t => {
                            return Err(SchemaError(format!(
                                "TimeUnit {t:?} not supported with Time32"
                            )))
                        }
                    },
                    DataType::Utf8 | DataType::LargeUtf8 => Arc::new(
                        rows.iter()
                            .map(|row| {
                                let maybe_value = self.field_lookup(&field_path, row);
                                match maybe_value {
                                    None => Ok(None),
                                    Some(v) => resolve_string(v),
                                }
                            })
                            .collect::<ArrowResult<StringArray>>()?,
                    ) as ArrayRef,
                    DataType::Binary | DataType::LargeBinary => Arc::new(
                        rows.iter()
                            .map(|row| {
                                let maybe_value = self.field_lookup(&field_path, row);
                                maybe_value.and_then(resolve_bytes)
                            })
                            .collect::<BinaryArray>(),
                    ) as ArrayRef,
                    DataType::FixedSizeBinary(ref size) => {
                        Arc::new(FixedSizeBinaryArray::try_from_sparse_iter_with_size(
                            rows.iter().map(|row| {
                                let maybe_value = self.field_lookup(&field_path, row);
                                maybe_value.and_then(|v| resolve_fixed(v, *size as usize))
                            }),
                            *size,
                        )?) as ArrayRef
                    }
                    DataType::List(ref list_field) => {
                        match list_field.data_type() {
                            DataType::Dictionary(ref key_ty, _) => {
                                self.build_wrapped_list_array(rows, &field_path, key_ty)?
                            }
                            _ => {
                                // extract rows by name
                                let extracted_rows = rows
                                    .iter()
                                    .map(|row| {
                                        self.field_lookup(&field_path, row).unwrap_or(&Value::Null)
                                    })
                                    .collect::<Vec<&Value>>();
                                self.build_nested_list_array::<i32>(
                                    &field_path,
                                    &extracted_rows,
                                    list_field,
                                )?
                            }
                        }
                    }
                    DataType::Dictionary(ref key_ty, ref val_ty) => {
                        self.build_string_dictionary_array(rows, &field_path, key_ty, val_ty)?
                    }
                    DataType::Struct(fields) => {
                        let len = rows.len();
                        let num_bytes = bit_util::ceil(len, 8);
                        let mut null_buffer = MutableBuffer::from_len_zeroed(num_bytes);
                        let empty_vec = vec![];
                        let struct_rows = rows
                            .iter()
                            .enumerate()
                            .map(|(i, row)| (i, self.field_lookup(&field_path, row)))
                            .map(|(i, v)| {
                                let v = v.map(maybe_resolve_union);
                                match v {
                                    Some(Value::Record(value)) => {
                                        bit_util::set_bit(&mut null_buffer, i);
                                        value
                                    }
                                    None | Some(Value::Null) => &empty_vec,
                                    other => {
                                        panic!("expected struct got {other:?}");
                                    }
                                }
                            })
                            .collect::<Vec<&Vec<(String, Value)>>>();
                        let arrays =
                            self.build_struct_array(&struct_rows, &field_path, fields, &[])?;
                        // construct a struct array's data in order to set null buffer
                        let data_type = DataType::Struct(fields.clone());
                        let data = ArrayDataBuilder::new(data_type)
                            .len(len)
                            .null_bit_buffer(Some(null_buffer.into()))
                            .child_data(arrays.into_iter().map(|a| a.to_data()).collect())
                            .build()?;
                        make_array(data)
                    }
                    _ => {
                        return Err(SchemaError(format!(
                            "type {:?} not supported",
                            field.data_type()
                        )))
                    }
                };
                Ok(arr)
            })
            .collect();
        arrays
    }