fn write_array_data()

in arrow-ipc/src/writer.rs [1185:1341]


fn write_array_data(
    array_data: &ArrayData,
    buffers: &mut Vec<crate::Buffer>,
    arrow_data: &mut Vec<u8>,
    nodes: &mut Vec<crate::FieldNode>,
    offset: i64,
    num_rows: usize,
    null_count: usize,
    compression_codec: Option<CompressionCodec>,
    write_options: &IpcWriteOptions,
) -> Result<i64, ArrowError> {
    let mut offset = offset;
    if !matches!(array_data.data_type(), DataType::Null) {
        nodes.push(crate::FieldNode::new(num_rows as i64, null_count as i64));
    } else {
        // NullArray's null_count equals to len, but the `null_count` passed in is from ArrayData
        // where null_count is always 0.
        nodes.push(crate::FieldNode::new(num_rows as i64, num_rows as i64));
    }
    if has_validity_bitmap(array_data.data_type(), write_options) {
        // write null buffer if exists
        let null_buffer = match array_data.nulls() {
            None => {
                // create a buffer and fill it with valid bits
                let num_bytes = bit_util::ceil(num_rows, 8);
                let buffer = MutableBuffer::new(num_bytes);
                let buffer = buffer.with_bitset(num_bytes, true);
                buffer.into()
            }
            Some(buffer) => buffer.inner().sliced(),
        };

        offset = write_buffer(
            null_buffer.as_slice(),
            buffers,
            arrow_data,
            offset,
            compression_codec,
        )?;
    }

    let data_type = array_data.data_type();
    if matches!(data_type, DataType::Binary | DataType::Utf8) {
        let (offsets, values) = get_byte_array_buffers::<i32>(array_data);
        for buffer in [offsets, values] {
            offset = write_buffer(
                buffer.as_slice(),
                buffers,
                arrow_data,
                offset,
                compression_codec,
            )?;
        }
    } else if matches!(data_type, DataType::LargeBinary | DataType::LargeUtf8) {
        let (offsets, values) = get_byte_array_buffers::<i64>(array_data);
        for buffer in [offsets, values] {
            offset = write_buffer(
                buffer.as_slice(),
                buffers,
                arrow_data,
                offset,
                compression_codec,
            )?;
        }
    } else if DataType::is_numeric(data_type)
        || DataType::is_temporal(data_type)
        || matches!(
            array_data.data_type(),
            DataType::FixedSizeBinary(_) | DataType::Dictionary(_, _)
        )
    {
        // Truncate values
        assert_eq!(array_data.buffers().len(), 1);

        let buffer = &array_data.buffers()[0];
        let layout = layout(data_type);
        let spec = &layout.buffers[0];

        let byte_width = get_buffer_element_width(spec);
        let min_length = array_data.len() * byte_width;
        if buffer_need_truncate(array_data.offset(), buffer, spec, min_length) {
            let byte_offset = array_data.offset() * byte_width;
            let buffer_length = min(min_length, buffer.len() - byte_offset);
            let buffer_slice =
                &buffer.as_slice()[byte_offset..(byte_offset + buffer_length)];
            offset = write_buffer(
                buffer_slice,
                buffers,
                arrow_data,
                offset,
                compression_codec,
            )?;
        } else {
            offset = write_buffer(
                buffer.as_slice(),
                buffers,
                arrow_data,
                offset,
                compression_codec,
            )?;
        }
    } else if matches!(data_type, DataType::Boolean) {
        // Bools are special because the payload (= 1 bit) is smaller than the physical container elements (= bytes).
        // The array data may not start at the physical boundary of the underlying buffer, so we need to shift bits around.
        assert_eq!(array_data.buffers().len(), 1);

        let buffer = &array_data.buffers()[0];
        let buffer = buffer.bit_slice(array_data.offset(), array_data.len());
        offset = write_buffer(&buffer, buffers, arrow_data, offset, compression_codec)?;
    } else {
        for buffer in array_data.buffers() {
            offset =
                write_buffer(buffer, buffers, arrow_data, offset, compression_codec)?;
        }
    }

    match array_data.data_type() {
        DataType::Dictionary(_, _) => {}
        DataType::RunEndEncoded(_, _) => {
            // unslice the run encoded array.
            let arr = unslice_run_array(array_data.clone())?;
            // recursively write out nested structures
            for data_ref in arr.child_data() {
                // write the nested data (e.g list data)
                offset = write_array_data(
                    data_ref,
                    buffers,
                    arrow_data,
                    nodes,
                    offset,
                    data_ref.len(),
                    data_ref.null_count(),
                    compression_codec,
                    write_options,
                )?;
            }
        }
        _ => {
            // recursively write out nested structures
            for data_ref in array_data.child_data() {
                // write the nested data (e.g list data)
                offset = write_array_data(
                    data_ref,
                    buffers,
                    arrow_data,
                    nodes,
                    offset,
                    data_ref.len(),
                    data_ref.null_count(),
                    compression_codec,
                    write_options,
                )?;
            }
        }
    }
    Ok(offset)
}