in crates/core/src/avro_to_arrow/arrow_array_reader.rs [602:773]
fn build_struct_array(
&self,
rows: RecordSlice,
parent_field_name: &str,
struct_fields: &Fields,
projection: &[String],
) -> ArrowResult<Vec<ArrayRef>> {
let arrays: ArrowResult<Vec<ArrayRef>> = struct_fields
.iter()
.filter(|field| projection.is_empty() || projection.contains(field.name()))
.map(|field| {
let field_path = if parent_field_name.is_empty() {
field.name().to_string()
} else {
format!("{}.{}", parent_field_name, field.name())
};
let arr = match field.data_type() {
DataType::Null => Arc::new(NullArray::new(rows.len())) as ArrayRef,
DataType::Boolean => self.build_boolean_array(rows, &field_path),
DataType::Float64 => {
self.build_primitive_array::<Float64Type>(rows, &field_path)
}
DataType::Float32 => {
self.build_primitive_array::<Float32Type>(rows, &field_path)
}
DataType::Int64 => self.build_primitive_array::<Int64Type>(rows, &field_path),
DataType::Int32 => self.build_primitive_array::<Int32Type>(rows, &field_path),
DataType::Int16 => self.build_primitive_array::<Int16Type>(rows, &field_path),
DataType::Int8 => self.build_primitive_array::<Int8Type>(rows, &field_path),
DataType::UInt64 => self.build_primitive_array::<UInt64Type>(rows, &field_path),
DataType::UInt32 => self.build_primitive_array::<UInt32Type>(rows, &field_path),
DataType::UInt16 => self.build_primitive_array::<UInt16Type>(rows, &field_path),
DataType::UInt8 => self.build_primitive_array::<UInt8Type>(rows, &field_path),
// TODO: this is incomplete
DataType::Timestamp(unit, _) => match unit {
TimeUnit::Second => {
self.build_primitive_array::<TimestampSecondType>(rows, &field_path)
}
TimeUnit::Microsecond => self
.build_primitive_array::<TimestampMicrosecondType>(rows, &field_path),
TimeUnit::Millisecond => self
.build_primitive_array::<TimestampMillisecondType>(rows, &field_path),
TimeUnit::Nanosecond => {
self.build_primitive_array::<TimestampNanosecondType>(rows, &field_path)
}
},
DataType::Date64 => self.build_primitive_array::<Date64Type>(rows, &field_path),
DataType::Date32 => self.build_primitive_array::<Date32Type>(rows, &field_path),
DataType::Time64(unit) => {
match unit {
TimeUnit::Microsecond => self
.build_primitive_array::<Time64MicrosecondType>(rows, &field_path),
TimeUnit::Nanosecond => self
.build_primitive_array::<Time64NanosecondType>(rows, &field_path),
t => {
return Err(SchemaError(format!(
"TimeUnit {t:?} not supported with Time64"
)))
}
}
}
DataType::Time32(unit) => match unit {
TimeUnit::Second => {
self.build_primitive_array::<Time32SecondType>(rows, &field_path)
}
TimeUnit::Millisecond => {
self.build_primitive_array::<Time32MillisecondType>(rows, &field_path)
}
t => {
return Err(SchemaError(format!(
"TimeUnit {t:?} not supported with Time32"
)))
}
},
DataType::Utf8 | DataType::LargeUtf8 => Arc::new(
rows.iter()
.map(|row| {
let maybe_value = self.field_lookup(&field_path, row);
match maybe_value {
None => Ok(None),
Some(v) => resolve_string(v),
}
})
.collect::<ArrowResult<StringArray>>()?,
) as ArrayRef,
DataType::Binary | DataType::LargeBinary => Arc::new(
rows.iter()
.map(|row| {
let maybe_value = self.field_lookup(&field_path, row);
maybe_value.and_then(resolve_bytes)
})
.collect::<BinaryArray>(),
) as ArrayRef,
DataType::FixedSizeBinary(ref size) => {
Arc::new(FixedSizeBinaryArray::try_from_sparse_iter_with_size(
rows.iter().map(|row| {
let maybe_value = self.field_lookup(&field_path, row);
maybe_value.and_then(|v| resolve_fixed(v, *size as usize))
}),
*size,
)?) as ArrayRef
}
DataType::List(ref list_field) => {
match list_field.data_type() {
DataType::Dictionary(ref key_ty, _) => {
self.build_wrapped_list_array(rows, &field_path, key_ty)?
}
_ => {
// extract rows by name
let extracted_rows = rows
.iter()
.map(|row| {
self.field_lookup(&field_path, row).unwrap_or(&Value::Null)
})
.collect::<Vec<&Value>>();
self.build_nested_list_array::<i32>(
&field_path,
&extracted_rows,
list_field,
)?
}
}
}
DataType::Dictionary(ref key_ty, ref val_ty) => {
self.build_string_dictionary_array(rows, &field_path, key_ty, val_ty)?
}
DataType::Struct(fields) => {
let len = rows.len();
let num_bytes = bit_util::ceil(len, 8);
let mut null_buffer = MutableBuffer::from_len_zeroed(num_bytes);
let empty_vec = vec![];
let struct_rows = rows
.iter()
.enumerate()
.map(|(i, row)| (i, self.field_lookup(&field_path, row)))
.map(|(i, v)| {
let v = v.map(maybe_resolve_union);
match v {
Some(Value::Record(value)) => {
bit_util::set_bit(&mut null_buffer, i);
value
}
None | Some(Value::Null) => &empty_vec,
other => {
panic!("expected struct got {other:?}");
}
}
})
.collect::<Vec<&Vec<(String, Value)>>>();
let arrays =
self.build_struct_array(&struct_rows, &field_path, fields, &[])?;
// construct a struct array's data in order to set null buffer
let data_type = DataType::Struct(fields.clone());
let data = ArrayDataBuilder::new(data_type)
.len(len)
.null_bit_buffer(Some(null_buffer.into()))
.child_data(arrays.into_iter().map(|a| a.to_data()).collect())
.build()?;
make_array(data)
}
_ => {
return Err(SchemaError(format!(
"type {:?} not supported",
field.data_type()
)))
}
};
Ok(arr)
})
.collect();
arrays
}