in arrow-ipc/src/reader.rs [81:238]
fn create_array(
&mut self,
field: &Field,
variadic_counts: &mut VecDeque<i64>,
) -> Result<ArrayRef, ArrowError> {
let data_type = field.data_type();
match data_type {
Utf8 | Binary | LargeBinary | LargeUtf8 => {
let field_node = self.next_node(field)?;
let buffers = [
self.next_buffer()?,
self.next_buffer()?,
self.next_buffer()?,
];
self.create_primitive_array(field_node, data_type, &buffers)
}
BinaryView | Utf8View => {
let count = variadic_counts
.pop_front()
.ok_or(ArrowError::IpcError(format!(
"Missing variadic count for {data_type} column"
)))?;
let count = count + 2; // view and null buffer.
let buffers = (0..count)
.map(|_| self.next_buffer())
.collect::<Result<Vec<_>, _>>()?;
let field_node = self.next_node(field)?;
self.create_primitive_array(field_node, data_type, &buffers)
}
FixedSizeBinary(_) => {
let field_node = self.next_node(field)?;
let buffers = [self.next_buffer()?, self.next_buffer()?];
self.create_primitive_array(field_node, data_type, &buffers)
}
List(ref list_field) | LargeList(ref list_field) | Map(ref list_field, _) => {
let list_node = self.next_node(field)?;
let list_buffers = [self.next_buffer()?, self.next_buffer()?];
let values = self.create_array(list_field, variadic_counts)?;
self.create_list_array(list_node, data_type, &list_buffers, values)
}
FixedSizeList(ref list_field, _) => {
let list_node = self.next_node(field)?;
let list_buffers = [self.next_buffer()?];
let values = self.create_array(list_field, variadic_counts)?;
self.create_list_array(list_node, data_type, &list_buffers, values)
}
Struct(struct_fields) => {
let struct_node = self.next_node(field)?;
let null_buffer = self.next_buffer()?;
// read the arrays for each field
let mut struct_arrays = vec![];
// TODO investigate whether just knowing the number of buffers could
// still work
for struct_field in struct_fields {
let child = self.create_array(struct_field, variadic_counts)?;
struct_arrays.push(child);
}
self.create_struct_array(struct_node, null_buffer, struct_fields, struct_arrays)
}
RunEndEncoded(run_ends_field, values_field) => {
let run_node = self.next_node(field)?;
let run_ends = self.create_array(run_ends_field, variadic_counts)?;
let values = self.create_array(values_field, variadic_counts)?;
let run_array_length = run_node.length() as usize;
let builder = ArrayData::builder(data_type.clone())
.len(run_array_length)
.offset(0)
.add_child_data(run_ends.into_data())
.add_child_data(values.into_data());
self.create_array_from_builder(builder)
}
// Create dictionary array from RecordBatch
Dictionary(_, _) => {
let index_node = self.next_node(field)?;
let index_buffers = [self.next_buffer()?, self.next_buffer()?];
#[allow(deprecated)]
let dict_id = field.dict_id().ok_or_else(|| {
ArrowError::ParseError(format!("Field {field} does not have dict id"))
})?;
let value_array = self.dictionaries_by_id.get(&dict_id).ok_or_else(|| {
ArrowError::ParseError(format!(
"Cannot find a dictionary batch with dict id: {dict_id}"
))
})?;
self.create_dictionary_array(
index_node,
data_type,
&index_buffers,
value_array.clone(),
)
}
Union(fields, mode) => {
let union_node = self.next_node(field)?;
let len = union_node.length() as usize;
// In V4, union types has validity bitmap
// In V5 and later, union types have no validity bitmap
if self.version < MetadataVersion::V5 {
self.next_buffer()?;
}
let type_ids: ScalarBuffer<i8> =
self.next_buffer()?.slice_with_length(0, len).into();
let value_offsets = match mode {
UnionMode::Dense => {
let offsets: ScalarBuffer<i32> =
self.next_buffer()?.slice_with_length(0, len * 4).into();
Some(offsets)
}
UnionMode::Sparse => None,
};
let mut children = Vec::with_capacity(fields.len());
for (_id, field) in fields.iter() {
let child = self.create_array(field, variadic_counts)?;
children.push(child);
}
let array = if self.skip_validation.get() {
// safety: flag can only be set via unsafe code
unsafe {
UnionArray::new_unchecked(fields.clone(), type_ids, value_offsets, children)
}
} else {
UnionArray::try_new(fields.clone(), type_ids, value_offsets, children)?
};
Ok(Arc::new(array))
}
Null => {
let node = self.next_node(field)?;
let length = node.length();
let null_count = node.null_count();
if length != null_count {
return Err(ArrowError::SchemaError(format!(
"Field {field} of NullArray has unequal null_count {null_count} and len {length}"
)));
}
let builder = ArrayData::builder(data_type.clone())
.len(length as usize)
.offset(0);
self.create_array_from_builder(builder)
}
_ => {
let field_node = self.next_node(field)?;
let buffers = [self.next_buffer()?, self.next_buffer()?];
self.create_primitive_array(field_node, data_type, &buffers)
}
}
}