fn write_array_data()

in arrow-ipc/src/writer.rs [1566:1803]


fn write_array_data(
    array_data: &ArrayData,
    buffers: &mut Vec<crate::Buffer>,
    arrow_data: &mut Vec<u8>,
    nodes: &mut Vec<crate::FieldNode>,
    offset: i64,
    num_rows: usize,
    null_count: usize,
    compression_codec: Option<CompressionCodec>,
    write_options: &IpcWriteOptions,
) -> Result<i64, ArrowError> {
    let mut offset = offset;
    if !matches!(array_data.data_type(), DataType::Null) {
        nodes.push(crate::FieldNode::new(num_rows as i64, null_count as i64));
    } else {
        // NullArray's null_count equals to len, but the `null_count` passed in is from ArrayData
        // where null_count is always 0.
        nodes.push(crate::FieldNode::new(num_rows as i64, num_rows as i64));
    }
    if has_validity_bitmap(array_data.data_type(), write_options) {
        // write null buffer if exists
        let null_buffer = match array_data.nulls() {
            None => {
                // create a buffer and fill it with valid bits
                let num_bytes = bit_util::ceil(num_rows, 8);
                let buffer = MutableBuffer::new(num_bytes);
                let buffer = buffer.with_bitset(num_bytes, true);
                buffer.into()
            }
            Some(buffer) => buffer.inner().sliced(),
        };

        offset = write_buffer(
            null_buffer.as_slice(),
            buffers,
            arrow_data,
            offset,
            compression_codec,
            write_options.alignment,
        )?;
    }

    let data_type = array_data.data_type();
    if matches!(data_type, DataType::Binary | DataType::Utf8) {
        let (offsets, values) = get_byte_array_buffers::<i32>(array_data);
        for buffer in [offsets, values] {
            offset = write_buffer(
                buffer.as_slice(),
                buffers,
                arrow_data,
                offset,
                compression_codec,
                write_options.alignment,
            )?;
        }
    } else if matches!(data_type, DataType::BinaryView | DataType::Utf8View) {
        // Slicing the views buffer is safe and easy,
        // but pruning unneeded data buffers is much more nuanced since it's complicated to prove that no views reference the pruned buffers
        //
        // Current implementation just serialize the raw arrays as given and not try to optimize anything.
        // If users wants to "compact" the arrays prior to sending them over IPC,
        // they should consider the gc API suggested in #5513
        for buffer in array_data.buffers() {
            offset = write_buffer(
                buffer.as_slice(),
                buffers,
                arrow_data,
                offset,
                compression_codec,
                write_options.alignment,
            )?;
        }
    } else if matches!(data_type, DataType::LargeBinary | DataType::LargeUtf8) {
        let (offsets, values) = get_byte_array_buffers::<i64>(array_data);
        for buffer in [offsets, values] {
            offset = write_buffer(
                buffer.as_slice(),
                buffers,
                arrow_data,
                offset,
                compression_codec,
                write_options.alignment,
            )?;
        }
    } else if DataType::is_numeric(data_type)
        || DataType::is_temporal(data_type)
        || matches!(
            array_data.data_type(),
            DataType::FixedSizeBinary(_) | DataType::Dictionary(_, _)
        )
    {
        // Truncate values
        assert_eq!(array_data.buffers().len(), 1);

        let buffer = &array_data.buffers()[0];
        let layout = layout(data_type);
        let spec = &layout.buffers[0];

        let byte_width = get_buffer_element_width(spec);
        let min_length = array_data.len() * byte_width;
        let buffer_slice = if buffer_need_truncate(array_data.offset(), buffer, spec, min_length) {
            let byte_offset = array_data.offset() * byte_width;
            let buffer_length = min(min_length, buffer.len() - byte_offset);
            &buffer.as_slice()[byte_offset..(byte_offset + buffer_length)]
        } else {
            buffer.as_slice()
        };
        offset = write_buffer(
            buffer_slice,
            buffers,
            arrow_data,
            offset,
            compression_codec,
            write_options.alignment,
        )?;
    } else if matches!(data_type, DataType::Boolean) {
        // Bools are special because the payload (= 1 bit) is smaller than the physical container elements (= bytes).
        // The array data may not start at the physical boundary of the underlying buffer, so we need to shift bits around.
        assert_eq!(array_data.buffers().len(), 1);

        let buffer = &array_data.buffers()[0];
        let buffer = buffer.bit_slice(array_data.offset(), array_data.len());
        offset = write_buffer(
            &buffer,
            buffers,
            arrow_data,
            offset,
            compression_codec,
            write_options.alignment,
        )?;
    } else if matches!(
        data_type,
        DataType::List(_) | DataType::LargeList(_) | DataType::Map(_, _)
    ) {
        assert_eq!(array_data.buffers().len(), 1);
        assert_eq!(array_data.child_data().len(), 1);

        // Truncate offsets and the child data to avoid writing unnecessary data
        let (offsets, sliced_child_data) = match data_type {
            DataType::List(_) => get_list_array_buffers::<i32>(array_data),
            DataType::Map(_, _) => get_list_array_buffers::<i32>(array_data),
            DataType::LargeList(_) => get_list_array_buffers::<i64>(array_data),
            _ => unreachable!(),
        };
        offset = write_buffer(
            offsets.as_slice(),
            buffers,
            arrow_data,
            offset,
            compression_codec,
            write_options.alignment,
        )?;
        offset = write_array_data(
            &sliced_child_data,
            buffers,
            arrow_data,
            nodes,
            offset,
            sliced_child_data.len(),
            sliced_child_data.null_count(),
            compression_codec,
            write_options,
        )?;
        return Ok(offset);
    } else if let DataType::FixedSizeList(_, fixed_size) = data_type {
        assert_eq!(array_data.child_data().len(), 1);
        let fixed_size = *fixed_size as usize;

        let child_offset = array_data.offset() * fixed_size;
        let child_length = array_data.len() * fixed_size;
        let child_data = array_data.child_data()[0].slice(child_offset, child_length);

        offset = write_array_data(
            &child_data,
            buffers,
            arrow_data,
            nodes,
            offset,
            child_data.len(),
            child_data.null_count(),
            compression_codec,
            write_options,
        )?;
        return Ok(offset);
    } else {
        for buffer in array_data.buffers() {
            offset = write_buffer(
                buffer,
                buffers,
                arrow_data,
                offset,
                compression_codec,
                write_options.alignment,
            )?;
        }
    }

    match array_data.data_type() {
        DataType::Dictionary(_, _) => {}
        DataType::RunEndEncoded(_, _) => {
            // unslice the run encoded array.
            let arr = unslice_run_array(array_data.clone())?;
            // recursively write out nested structures
            for data_ref in arr.child_data() {
                // write the nested data (e.g list data)
                offset = write_array_data(
                    data_ref,
                    buffers,
                    arrow_data,
                    nodes,
                    offset,
                    data_ref.len(),
                    data_ref.null_count(),
                    compression_codec,
                    write_options,
                )?;
            }
        }
        _ => {
            // recursively write out nested structures
            for data_ref in array_data.child_data() {
                // write the nested data (e.g list data)
                offset = write_array_data(
                    data_ref,
                    buffers,
                    arrow_data,
                    nodes,
                    offset,
                    data_ref.len(),
                    data_ref.null_count(),
                    compression_codec,
                    write_options,
                )?;
            }
        }
    }
    Ok(offset)
}