in arrow-ipc/src/writer.rs [1566:1803]
fn write_array_data(
array_data: &ArrayData,
buffers: &mut Vec<crate::Buffer>,
arrow_data: &mut Vec<u8>,
nodes: &mut Vec<crate::FieldNode>,
offset: i64,
num_rows: usize,
null_count: usize,
compression_codec: Option<CompressionCodec>,
write_options: &IpcWriteOptions,
) -> Result<i64, ArrowError> {
let mut offset = offset;
if !matches!(array_data.data_type(), DataType::Null) {
nodes.push(crate::FieldNode::new(num_rows as i64, null_count as i64));
} else {
// NullArray's null_count equals to len, but the `null_count` passed in is from ArrayData
// where null_count is always 0.
nodes.push(crate::FieldNode::new(num_rows as i64, num_rows as i64));
}
if has_validity_bitmap(array_data.data_type(), write_options) {
// write null buffer if exists
let null_buffer = match array_data.nulls() {
None => {
// create a buffer and fill it with valid bits
let num_bytes = bit_util::ceil(num_rows, 8);
let buffer = MutableBuffer::new(num_bytes);
let buffer = buffer.with_bitset(num_bytes, true);
buffer.into()
}
Some(buffer) => buffer.inner().sliced(),
};
offset = write_buffer(
null_buffer.as_slice(),
buffers,
arrow_data,
offset,
compression_codec,
write_options.alignment,
)?;
}
let data_type = array_data.data_type();
if matches!(data_type, DataType::Binary | DataType::Utf8) {
let (offsets, values) = get_byte_array_buffers::<i32>(array_data);
for buffer in [offsets, values] {
offset = write_buffer(
buffer.as_slice(),
buffers,
arrow_data,
offset,
compression_codec,
write_options.alignment,
)?;
}
} else if matches!(data_type, DataType::BinaryView | DataType::Utf8View) {
// Slicing the views buffer is safe and easy,
// but pruning unneeded data buffers is much more nuanced since it's complicated to prove that no views reference the pruned buffers
//
// Current implementation just serialize the raw arrays as given and not try to optimize anything.
// If users wants to "compact" the arrays prior to sending them over IPC,
// they should consider the gc API suggested in #5513
for buffer in array_data.buffers() {
offset = write_buffer(
buffer.as_slice(),
buffers,
arrow_data,
offset,
compression_codec,
write_options.alignment,
)?;
}
} else if matches!(data_type, DataType::LargeBinary | DataType::LargeUtf8) {
let (offsets, values) = get_byte_array_buffers::<i64>(array_data);
for buffer in [offsets, values] {
offset = write_buffer(
buffer.as_slice(),
buffers,
arrow_data,
offset,
compression_codec,
write_options.alignment,
)?;
}
} else if DataType::is_numeric(data_type)
|| DataType::is_temporal(data_type)
|| matches!(
array_data.data_type(),
DataType::FixedSizeBinary(_) | DataType::Dictionary(_, _)
)
{
// Truncate values
assert_eq!(array_data.buffers().len(), 1);
let buffer = &array_data.buffers()[0];
let layout = layout(data_type);
let spec = &layout.buffers[0];
let byte_width = get_buffer_element_width(spec);
let min_length = array_data.len() * byte_width;
let buffer_slice = if buffer_need_truncate(array_data.offset(), buffer, spec, min_length) {
let byte_offset = array_data.offset() * byte_width;
let buffer_length = min(min_length, buffer.len() - byte_offset);
&buffer.as_slice()[byte_offset..(byte_offset + buffer_length)]
} else {
buffer.as_slice()
};
offset = write_buffer(
buffer_slice,
buffers,
arrow_data,
offset,
compression_codec,
write_options.alignment,
)?;
} else if matches!(data_type, DataType::Boolean) {
// Bools are special because the payload (= 1 bit) is smaller than the physical container elements (= bytes).
// The array data may not start at the physical boundary of the underlying buffer, so we need to shift bits around.
assert_eq!(array_data.buffers().len(), 1);
let buffer = &array_data.buffers()[0];
let buffer = buffer.bit_slice(array_data.offset(), array_data.len());
offset = write_buffer(
&buffer,
buffers,
arrow_data,
offset,
compression_codec,
write_options.alignment,
)?;
} else if matches!(
data_type,
DataType::List(_) | DataType::LargeList(_) | DataType::Map(_, _)
) {
assert_eq!(array_data.buffers().len(), 1);
assert_eq!(array_data.child_data().len(), 1);
// Truncate offsets and the child data to avoid writing unnecessary data
let (offsets, sliced_child_data) = match data_type {
DataType::List(_) => get_list_array_buffers::<i32>(array_data),
DataType::Map(_, _) => get_list_array_buffers::<i32>(array_data),
DataType::LargeList(_) => get_list_array_buffers::<i64>(array_data),
_ => unreachable!(),
};
offset = write_buffer(
offsets.as_slice(),
buffers,
arrow_data,
offset,
compression_codec,
write_options.alignment,
)?;
offset = write_array_data(
&sliced_child_data,
buffers,
arrow_data,
nodes,
offset,
sliced_child_data.len(),
sliced_child_data.null_count(),
compression_codec,
write_options,
)?;
return Ok(offset);
} else if let DataType::FixedSizeList(_, fixed_size) = data_type {
assert_eq!(array_data.child_data().len(), 1);
let fixed_size = *fixed_size as usize;
let child_offset = array_data.offset() * fixed_size;
let child_length = array_data.len() * fixed_size;
let child_data = array_data.child_data()[0].slice(child_offset, child_length);
offset = write_array_data(
&child_data,
buffers,
arrow_data,
nodes,
offset,
child_data.len(),
child_data.null_count(),
compression_codec,
write_options,
)?;
return Ok(offset);
} else {
for buffer in array_data.buffers() {
offset = write_buffer(
buffer,
buffers,
arrow_data,
offset,
compression_codec,
write_options.alignment,
)?;
}
}
match array_data.data_type() {
DataType::Dictionary(_, _) => {}
DataType::RunEndEncoded(_, _) => {
// unslice the run encoded array.
let arr = unslice_run_array(array_data.clone())?;
// recursively write out nested structures
for data_ref in arr.child_data() {
// write the nested data (e.g list data)
offset = write_array_data(
data_ref,
buffers,
arrow_data,
nodes,
offset,
data_ref.len(),
data_ref.null_count(),
compression_codec,
write_options,
)?;
}
}
_ => {
// recursively write out nested structures
for data_ref in array_data.child_data() {
// write the nested data (e.g list data)
offset = write_array_data(
data_ref,
buffers,
arrow_data,
nodes,
offset,
data_ref.len(),
data_ref.null_count(),
compression_codec,
write_options,
)?;
}
}
}
Ok(offset)
}