in arrow-ipc/src/reader.rs [71:216]
fn create_array(reader: &mut ArrayReader, field: &Field) -> Result<ArrayRef, ArrowError> {
let data_type = field.data_type();
match data_type {
Utf8 | Binary | LargeBinary | LargeUtf8 => create_primitive_array(
reader.next_node(field)?,
data_type,
&[
reader.next_buffer()?,
reader.next_buffer()?,
reader.next_buffer()?,
],
),
FixedSizeBinary(_) => create_primitive_array(
reader.next_node(field)?,
data_type,
&[reader.next_buffer()?, reader.next_buffer()?],
),
List(ref list_field) | LargeList(ref list_field) | Map(ref list_field, _) => {
let list_node = reader.next_node(field)?;
let list_buffers = [reader.next_buffer()?, reader.next_buffer()?];
let values = create_array(reader, list_field)?;
create_list_array(list_node, data_type, &list_buffers, values)
}
FixedSizeList(ref list_field, _) => {
let list_node = reader.next_node(field)?;
let list_buffers = [reader.next_buffer()?];
let values = create_array(reader, list_field)?;
create_list_array(list_node, data_type, &list_buffers, values)
}
Struct(struct_fields) => {
let struct_node = reader.next_node(field)?;
let null_buffer = reader.next_buffer()?;
// read the arrays for each field
let mut struct_arrays = vec![];
// TODO investigate whether just knowing the number of buffers could
// still work
for struct_field in struct_fields {
let child = create_array(reader, struct_field)?;
struct_arrays.push((struct_field.clone(), child));
}
let null_count = struct_node.null_count() as usize;
let struct_array = if null_count > 0 {
// create struct array from fields, arrays and null data
StructArray::from((struct_arrays, null_buffer))
} else {
StructArray::from(struct_arrays)
};
Ok(Arc::new(struct_array))
}
RunEndEncoded(run_ends_field, values_field) => {
let run_node = reader.next_node(field)?;
let run_ends = create_array(reader, run_ends_field)?;
let values = create_array(reader, values_field)?;
let run_array_length = run_node.length() as usize;
let data = ArrayData::builder(data_type.clone())
.len(run_array_length)
.offset(0)
.add_child_data(run_ends.into_data())
.add_child_data(values.into_data())
.build()?;
Ok(make_array(data))
}
// Create dictionary array from RecordBatch
Dictionary(_, _) => {
let index_node = reader.next_node(field)?;
let index_buffers = [reader.next_buffer()?, reader.next_buffer()?];
let dict_id = field.dict_id().ok_or_else(|| {
ArrowError::IoError(format!("Field {field} does not have dict id"))
})?;
let value_array =
reader.dictionaries_by_id.get(&dict_id).ok_or_else(|| {
ArrowError::IoError(format!(
"Cannot find a dictionary batch with dict id: {dict_id}"
))
})?;
create_dictionary_array(
index_node,
data_type,
&index_buffers,
value_array.clone(),
)
}
Union(fields, mode) => {
let union_node = reader.next_node(field)?;
let len = union_node.length() as usize;
// In V4, union types has validity bitmap
// In V5 and later, union types have no validity bitmap
if reader.version < MetadataVersion::V5 {
reader.next_buffer()?;
}
let type_ids: Buffer = reader.next_buffer()?[..len].into();
let value_offsets = match mode {
UnionMode::Dense => {
let buffer = reader.next_buffer()?;
Some(buffer[..len * 4].into())
}
UnionMode::Sparse => None,
};
let mut children = Vec::with_capacity(fields.len());
let mut ids = Vec::with_capacity(fields.len());
for (id, field) in fields.iter() {
let child = create_array(reader, field)?;
children.push((field.as_ref().clone(), child));
ids.push(id);
}
let array = UnionArray::try_new(&ids, type_ids, value_offsets, children)?;
Ok(Arc::new(array))
}
Null => {
let node = reader.next_node(field)?;
let length = node.length();
let null_count = node.null_count();
if length != null_count {
return Err(ArrowError::IoError(format!(
"Field {field} of NullArray has unequal null_count {null_count} and len {length}"
)));
}
let data = ArrayData::builder(data_type.clone())
.len(length as usize)
.offset(0)
.build()
.unwrap();
// no buffer increases
Ok(Arc::new(NullArray::from(data)))
}
_ => create_primitive_array(
reader.next_node(field)?,
data_type,
&[reader.next_buffer()?, reader.next_buffer()?],
),
}
}