fn general_array_slice()

in datafusion/functions-nested/src/extract.rs [459:648]


fn general_array_slice<O: OffsetSizeTrait>(
    array: &GenericListArray<O>,
    from_array: &Int64Array,
    to_array: &Int64Array,
    stride: Option<&Int64Array>,
) -> Result<ArrayRef>
where
    i64: TryInto<O>,
{
    let values = array.values();
    let original_data = values.to_data();
    let capacity = Capacities::Array(original_data.len());

    let mut mutable =
        MutableArrayData::with_capacities(vec![&original_data], true, capacity);

    // We have the slice syntax compatible with DuckDB v0.8.1.
    // The rule `adjusted_from_index` and `adjusted_to_index` follows the rule of array_slice in duckdb.

    fn adjusted_from_index<O: OffsetSizeTrait>(index: i64, len: O) -> Result<Option<O>>
    where
        i64: TryInto<O>,
    {
        // 0 ~ len - 1
        let adjusted_zero_index = if index < 0 {
            if let Ok(index) = index.try_into() {
                // When index < 0 and -index > length, index is clamped to the beginning of the list.
                // Otherwise, when index < 0, the index is counted from the end of the list.
                //
                // Note, we actually test the contrapositive, index < -length, because negating a
                // negative will panic if the negative is equal to the smallest representable value
                // while negating a positive is always safe.
                if index < (O::zero() - O::one()) * len {
                    O::zero()
                } else {
                    index + len
                }
            } else {
                return exec_err!("array_slice got invalid index: {}", index);
            }
        } else {
            // array_slice(arr, 1, to) is the same as array_slice(arr, 0, to)
            if let Ok(index) = index.try_into() {
                std::cmp::max(index - O::usize_as(1), O::usize_as(0))
            } else {
                return exec_err!("array_slice got invalid index: {}", index);
            }
        };

        if O::usize_as(0) <= adjusted_zero_index && adjusted_zero_index < len {
            Ok(Some(adjusted_zero_index))
        } else {
            // Out of bounds
            Ok(None)
        }
    }

    fn adjusted_to_index<O: OffsetSizeTrait>(index: i64, len: O) -> Result<Option<O>>
    where
        i64: TryInto<O>,
    {
        // 0 ~ len - 1
        let adjusted_zero_index = if index < 0 {
            // array_slice in duckdb with negative to_index is python-like, so index itself is exclusive
            if let Ok(index) = index.try_into() {
                index + len
            } else {
                return exec_err!("array_slice got invalid index: {}", index);
            }
        } else {
            // array_slice(arr, from, len + 1) is the same as array_slice(arr, from, len)
            if let Ok(index) = index.try_into() {
                std::cmp::min(index - O::usize_as(1), len - O::usize_as(1))
            } else {
                return exec_err!("array_slice got invalid index: {}", index);
            }
        };

        if O::usize_as(0) <= adjusted_zero_index && adjusted_zero_index < len {
            Ok(Some(adjusted_zero_index))
        } else {
            // Out of bounds
            Ok(None)
        }
    }

    let mut offsets = vec![O::usize_as(0)];
    let mut null_builder = NullBufferBuilder::new(array.len());

    for (row_index, offset_window) in array.offsets().windows(2).enumerate() {
        let start = offset_window[0];
        let end = offset_window[1];
        let len = end - start;

        // If any input is null, return null.
        if array.is_null(row_index)
            || from_array.is_null(row_index)
            || to_array.is_null(row_index)
        {
            mutable.extend_nulls(1);
            offsets.push(offsets[row_index] + O::usize_as(1));
            null_builder.append_null();
            continue;
        }
        null_builder.append_non_null();

        // Empty arrays always return an empty array.
        if len == O::usize_as(0) {
            offsets.push(offsets[row_index]);
            continue;
        }

        let from_index = adjusted_from_index::<O>(from_array.value(row_index), len)?;
        let to_index = adjusted_to_index::<O>(to_array.value(row_index), len)?;

        if let (Some(from), Some(to)) = (from_index, to_index) {
            let stride = stride.map(|s| s.value(row_index));
            // Default stride is 1 if not provided
            let stride = stride.unwrap_or(1);
            if stride.is_zero() {
                return exec_err!(
                    "array_slice got invalid stride: {:?}, it cannot be 0",
                    stride
                );
            } else if (from < to && stride.is_negative())
                || (from > to && stride.is_positive())
            {
                // return empty array
                offsets.push(offsets[row_index]);
                continue;
            }

            let stride: O = stride.try_into().map_err(|_| {
                internal_datafusion_err!("array_slice got invalid stride: {}", stride)
            })?;

            if from <= to && stride > O::zero() {
                assert!(start + to <= end);
                if stride.eq(&O::one()) {
                    // stride is default to 1
                    mutable.extend(
                        0,
                        (start + from).to_usize().unwrap(),
                        (start + to + O::usize_as(1)).to_usize().unwrap(),
                    );
                    offsets.push(offsets[row_index] + (to - from + O::usize_as(1)));
                    continue;
                }
                let mut index = start + from;
                let mut cnt = 0;
                while index <= start + to {
                    mutable.extend(
                        0,
                        index.to_usize().unwrap(),
                        index.to_usize().unwrap() + 1,
                    );
                    index += stride;
                    cnt += 1;
                }
                offsets.push(offsets[row_index] + O::usize_as(cnt));
            } else {
                let mut index = start + from;
                let mut cnt = 0;
                while index >= start + to {
                    mutable.extend(
                        0,
                        index.to_usize().unwrap(),
                        index.to_usize().unwrap() + 1,
                    );
                    index += stride;
                    cnt += 1;
                }
                // invalid range, return empty array
                offsets.push(offsets[row_index] + O::usize_as(cnt));
            }
        } else {
            // invalid range, return empty array
            offsets.push(offsets[row_index]);
        }
    }

    let data = mutable.freeze();

    Ok(Arc::new(GenericListArray::<O>::try_new(
        Arc::new(Field::new_list_field(array.value_type(), true)),
        OffsetBuffer::<O>::new(offsets.into()),
        arrow::array::make_array(data),
        null_builder.finish(),
    )?))
}