in crates/core/src/avro_to_arrow/arrow_array_reader.rs [425:592]
fn build_nested_list_array<OffsetSize: OffsetSizeTrait>(
&self,
parent_field_name: &str,
rows: &[&Value],
list_field: &Field,
) -> ArrowResult<ArrayRef> {
// build list offsets
let mut cur_offset = OffsetSize::zero();
let list_len = rows.len();
let num_list_bytes = bit_util::ceil(list_len, 8);
let mut offsets = Vec::with_capacity(list_len + 1);
let mut list_nulls = MutableBuffer::from_len_zeroed(num_list_bytes);
offsets.push(cur_offset);
rows.iter().enumerate().for_each(|(i, v)| {
// TODO: unboxing Union(Array(Union(...))) should probably be done earlier
let v = maybe_resolve_union(v);
if let Value::Array(a) = v {
cur_offset += OffsetSize::from_usize(a.len()).unwrap();
bit_util::set_bit(&mut list_nulls, i);
} else if let Value::Null = v {
// value is null, not incremented
} else {
cur_offset += OffsetSize::one();
}
offsets.push(cur_offset);
});
let valid_len = cur_offset.to_usize().unwrap();
let array_data = match list_field.data_type() {
DataType::Null => NullArray::new(valid_len).into_data(),
DataType::Boolean => {
let num_bytes = bit_util::ceil(valid_len, 8);
let mut bool_values = MutableBuffer::from_len_zeroed(num_bytes);
let mut bool_nulls = MutableBuffer::new(num_bytes).with_bitset(num_bytes, true);
let mut curr_index = 0;
rows.iter().for_each(|v| {
if let Value::Array(vs) = v {
vs.iter().for_each(|value| {
if let Value::Boolean(child) = value {
// if valid boolean, append value
if *child {
bit_util::set_bit(&mut bool_values, curr_index);
}
} else {
// null slot
bit_util::unset_bit(&mut bool_nulls, curr_index);
}
curr_index += 1;
});
}
});
ArrayData::builder(list_field.data_type().clone())
.len(valid_len)
.add_buffer(bool_values.into())
.null_bit_buffer(Some(bool_nulls.into()))
.build()
.unwrap()
}
DataType::Int8 => self.read_primitive_list_values::<Int8Type>(rows),
DataType::Int16 => self.read_primitive_list_values::<Int16Type>(rows),
DataType::Int32 => self.read_primitive_list_values::<Int32Type>(rows),
DataType::Int64 => self.read_primitive_list_values::<Int64Type>(rows),
DataType::UInt8 => self.read_primitive_list_values::<UInt8Type>(rows),
DataType::UInt16 => self.read_primitive_list_values::<UInt16Type>(rows),
DataType::UInt32 => self.read_primitive_list_values::<UInt32Type>(rows),
DataType::UInt64 => self.read_primitive_list_values::<UInt64Type>(rows),
DataType::Float16 => return Err(SchemaError("Float16 not supported".to_string())),
DataType::Float32 => self.read_primitive_list_values::<Float32Type>(rows),
DataType::Float64 => self.read_primitive_list_values::<Float64Type>(rows),
DataType::Timestamp(_, _)
| DataType::Date32
| DataType::Date64
| DataType::Time32(_)
| DataType::Time64(_) => {
return Err(SchemaError(
"Temporal types are not yet supported, see ARROW-4803".to_string(),
))
}
DataType::Utf8 => flatten_string_values(rows)
.into_iter()
.collect::<StringArray>()
.into_data(),
DataType::LargeUtf8 => flatten_string_values(rows)
.into_iter()
.collect::<LargeStringArray>()
.into_data(),
DataType::List(field) => {
let child = self.build_nested_list_array::<i32>(
parent_field_name,
&flatten_values(rows),
field,
)?;
child.to_data()
}
DataType::LargeList(field) => {
let child = self.build_nested_list_array::<i64>(
parent_field_name,
&flatten_values(rows),
field,
)?;
child.to_data()
}
DataType::Struct(fields) => {
// extract list values, with non-lists converted to Value::Null
let array_item_count = rows
.iter()
.map(|row| match maybe_resolve_union(row) {
Value::Array(values) => values.len(),
_ => 1,
})
.sum();
let num_bytes = bit_util::ceil(array_item_count, 8);
let mut null_buffer = MutableBuffer::from_len_zeroed(num_bytes);
let mut struct_index = 0;
let null_struct_array = vec![("null".to_string(), Value::Null)];
let rows: Vec<&Vec<(String, Value)>> = rows
.iter()
.map(|v| maybe_resolve_union(v))
.flat_map(|row| {
if let Value::Array(values) = row {
values
.iter()
.map(maybe_resolve_union)
.map(|v| match v {
Value::Record(record) => {
bit_util::set_bit(&mut null_buffer, struct_index);
struct_index += 1;
record
}
Value::Null => {
struct_index += 1;
&null_struct_array
}
other => panic!("expected Record, got {other:?}"),
})
.collect::<Vec<&Vec<(String, Value)>>>()
} else {
struct_index += 1;
vec![&null_struct_array]
}
})
.collect();
let sub_parent_field_name = format!("{}.{}", parent_field_name, list_field.name());
let arrays = self.build_struct_array(&rows, &sub_parent_field_name, fields, &[])?;
let data_type = DataType::Struct(fields.clone());
ArrayDataBuilder::new(data_type)
.len(rows.len())
.null_bit_buffer(Some(null_buffer.into()))
.child_data(arrays.into_iter().map(|a| a.to_data()).collect())
.build()
.unwrap()
}
datatype => {
return Err(SchemaError(format!(
"Nested list of {datatype:?} not supported"
)));
}
};
// build list
let list_data = ArrayData::builder(DataType::List(Arc::new(list_field.clone())))
.len(list_len)
.add_buffer(Buffer::from_slice_ref(&offsets))
.add_child_data(array_data)
.null_bit_buffer(Some(list_nulls.into()))
.build()
.unwrap();
Ok(Arc::new(GenericListArray::<OffsetSize>::from(list_data)))
}