fn build_struct_array()

in datafusion/datasource-avro/src/avro_to_arrow/arrow_array_reader.rs [631:842]


    fn build_struct_array(
        &self,
        rows: RecordSlice,
        parent_field_name: &str,
        struct_fields: &Fields,
    ) -> ArrowResult<Vec<ArrayRef>> {
        let arrays: ArrowResult<Vec<ArrayRef>> = struct_fields
            .iter()
            .map(|field| {
                let field_path = if parent_field_name.is_empty() {
                    field.name().to_string()
                } else {
                    format!("{}.{}", parent_field_name, field.name())
                };
                let arr = match field.data_type() {
                    DataType::Null => Arc::new(NullArray::new(rows.len())) as ArrayRef,
                    DataType::Boolean => self.build_boolean_array(rows, &field_path),
                    DataType::Float64 => {
                        self.build_primitive_array::<Float64Type>(rows, &field_path)
                    }
                    DataType::Float32 => {
                        self.build_primitive_array::<Float32Type>(rows, &field_path)
                    }
                    DataType::Int64 => {
                        self.build_primitive_array::<Int64Type>(rows, &field_path)
                    }
                    DataType::Int32 => {
                        self.build_primitive_array::<Int32Type>(rows, &field_path)
                    }
                    DataType::Int16 => {
                        self.build_primitive_array::<Int16Type>(rows, &field_path)
                    }
                    DataType::Int8 => {
                        self.build_primitive_array::<Int8Type>(rows, &field_path)
                    }
                    DataType::UInt64 => {
                        self.build_primitive_array::<UInt64Type>(rows, &field_path)
                    }
                    DataType::UInt32 => {
                        self.build_primitive_array::<UInt32Type>(rows, &field_path)
                    }
                    DataType::UInt16 => {
                        self.build_primitive_array::<UInt16Type>(rows, &field_path)
                    }
                    DataType::UInt8 => {
                        self.build_primitive_array::<UInt8Type>(rows, &field_path)
                    }
                    // TODO: this is incomplete
                    DataType::Timestamp(unit, _) => match unit {
                        TimeUnit::Second => self
                            .build_primitive_array::<TimestampSecondType>(
                                rows,
                                &field_path,
                            ),
                        TimeUnit::Microsecond => self
                            .build_primitive_array::<TimestampMicrosecondType>(
                                rows,
                                &field_path,
                            ),
                        TimeUnit::Millisecond => self
                            .build_primitive_array::<TimestampMillisecondType>(
                                rows,
                                &field_path,
                            ),
                        TimeUnit::Nanosecond => self
                            .build_primitive_array::<TimestampNanosecondType>(
                                rows,
                                &field_path,
                            ),
                    },
                    DataType::Date64 => {
                        self.build_primitive_array::<Date64Type>(rows, &field_path)
                    }
                    DataType::Date32 => {
                        self.build_primitive_array::<Date32Type>(rows, &field_path)
                    }
                    DataType::Time64(unit) => match unit {
                        TimeUnit::Microsecond => self
                            .build_primitive_array::<Time64MicrosecondType>(
                                rows,
                                &field_path,
                            ),
                        TimeUnit::Nanosecond => self
                            .build_primitive_array::<Time64NanosecondType>(
                                rows,
                                &field_path,
                            ),
                        t => {
                            return Err(SchemaError(format!(
                                "TimeUnit {t:?} not supported with Time64"
                            )))
                        }
                    },
                    DataType::Time32(unit) => match unit {
                        TimeUnit::Second => self
                            .build_primitive_array::<Time32SecondType>(rows, &field_path),
                        TimeUnit::Millisecond => self
                            .build_primitive_array::<Time32MillisecondType>(
                                rows,
                                &field_path,
                            ),
                        t => {
                            return Err(SchemaError(format!(
                                "TimeUnit {t:?} not supported with Time32"
                            )))
                        }
                    },
                    DataType::Utf8 | DataType::LargeUtf8 => Arc::new(
                        rows.iter()
                            .map(|row| {
                                let maybe_value = self.field_lookup(&field_path, row);
                                match maybe_value {
                                    None => Ok(None),
                                    Some(v) => resolve_string(v),
                                }
                            })
                            .collect::<ArrowResult<StringArray>>()?,
                    )
                        as ArrayRef,
                    DataType::Binary | DataType::LargeBinary => Arc::new(
                        rows.iter()
                            .map(|row| {
                                let maybe_value = self.field_lookup(&field_path, row);
                                maybe_value.and_then(resolve_bytes)
                            })
                            .collect::<BinaryArray>(),
                    )
                        as ArrayRef,
                    DataType::FixedSizeBinary(ref size) => {
                        Arc::new(FixedSizeBinaryArray::try_from_sparse_iter_with_size(
                            rows.iter().map(|row| {
                                let maybe_value = self.field_lookup(&field_path, row);
                                maybe_value.and_then(|v| resolve_fixed(v, *size as usize))
                            }),
                            *size,
                        )?) as ArrayRef
                    }
                    DataType::List(ref list_field) => {
                        match list_field.data_type() {
                            DataType::Dictionary(ref key_ty, _) => {
                                self.build_wrapped_list_array(rows, &field_path, key_ty)?
                            }
                            _ => {
                                // extract rows by name
                                let extracted_rows = rows
                                    .iter()
                                    .map(|row| {
                                        self.field_lookup(&field_path, row)
                                            .unwrap_or(&Value::Null)
                                    })
                                    .collect::<Vec<&Value>>();
                                self.build_nested_list_array::<i32>(
                                    &field_path,
                                    &extracted_rows,
                                    list_field,
                                )?
                            }
                        }
                    }
                    DataType::Dictionary(ref key_ty, ref val_ty) => self
                        .build_string_dictionary_array(
                            rows,
                            &field_path,
                            key_ty,
                            val_ty,
                        )?,
                    DataType::Struct(fields) => {
                        let len = rows.len();
                        let num_bytes = bit_util::ceil(len, 8);
                        let mut null_buffer = MutableBuffer::from_len_zeroed(num_bytes);
                        let empty_vec = vec![];
                        let struct_rows = rows
                            .iter()
                            .enumerate()
                            .map(|(i, row)| (i, self.field_lookup(&field_path, row)))
                            .map(|(i, v)| {
                                let v = v.map(maybe_resolve_union);
                                match v {
                                    Some(Value::Record(value)) => {
                                        bit_util::set_bit(&mut null_buffer, i);
                                        value
                                    }
                                    None | Some(Value::Null) => &empty_vec,
                                    other => {
                                        panic!("expected struct got {other:?}");
                                    }
                                }
                            })
                            .collect::<Vec<&Vec<(String, Value)>>>();
                        let arrays =
                            self.build_struct_array(&struct_rows, &field_path, fields)?;
                        // construct a struct array's data in order to set null buffer
                        let data_type = DataType::Struct(fields.clone());
                        let data = ArrayDataBuilder::new(data_type)
                            .len(len)
                            .null_bit_buffer(Some(null_buffer.into()))
                            .child_data(arrays.into_iter().map(|a| a.to_data()).collect())
                            .build()?;
                        make_array(data)
                    }
                    _ => {
                        return Err(SchemaError(format!(
                            "type {:?} not supported",
                            field.data_type()
                        )))
                    }
                };
                Ok(arr)
            })
            .collect();
        arrays
    }