in datafusion/functions-nested/src/extract.rs [459:648]
fn general_array_slice<O: OffsetSizeTrait>(
array: &GenericListArray<O>,
from_array: &Int64Array,
to_array: &Int64Array,
stride: Option<&Int64Array>,
) -> Result<ArrayRef>
where
i64: TryInto<O>,
{
let values = array.values();
let original_data = values.to_data();
let capacity = Capacities::Array(original_data.len());
let mut mutable =
MutableArrayData::with_capacities(vec![&original_data], true, capacity);
// We have the slice syntax compatible with DuckDB v0.8.1.
// The rule `adjusted_from_index` and `adjusted_to_index` follows the rule of array_slice in duckdb.
fn adjusted_from_index<O: OffsetSizeTrait>(index: i64, len: O) -> Result<Option<O>>
where
i64: TryInto<O>,
{
// 0 ~ len - 1
let adjusted_zero_index = if index < 0 {
if let Ok(index) = index.try_into() {
// When index < 0 and -index > length, index is clamped to the beginning of the list.
// Otherwise, when index < 0, the index is counted from the end of the list.
//
// Note, we actually test the contrapositive, index < -length, because negating a
// negative will panic if the negative is equal to the smallest representable value
// while negating a positive is always safe.
if index < (O::zero() - O::one()) * len {
O::zero()
} else {
index + len
}
} else {
return exec_err!("array_slice got invalid index: {}", index);
}
} else {
// array_slice(arr, 1, to) is the same as array_slice(arr, 0, to)
if let Ok(index) = index.try_into() {
std::cmp::max(index - O::usize_as(1), O::usize_as(0))
} else {
return exec_err!("array_slice got invalid index: {}", index);
}
};
if O::usize_as(0) <= adjusted_zero_index && adjusted_zero_index < len {
Ok(Some(adjusted_zero_index))
} else {
// Out of bounds
Ok(None)
}
}
fn adjusted_to_index<O: OffsetSizeTrait>(index: i64, len: O) -> Result<Option<O>>
where
i64: TryInto<O>,
{
// 0 ~ len - 1
let adjusted_zero_index = if index < 0 {
// array_slice in duckdb with negative to_index is python-like, so index itself is exclusive
if let Ok(index) = index.try_into() {
index + len
} else {
return exec_err!("array_slice got invalid index: {}", index);
}
} else {
// array_slice(arr, from, len + 1) is the same as array_slice(arr, from, len)
if let Ok(index) = index.try_into() {
std::cmp::min(index - O::usize_as(1), len - O::usize_as(1))
} else {
return exec_err!("array_slice got invalid index: {}", index);
}
};
if O::usize_as(0) <= adjusted_zero_index && adjusted_zero_index < len {
Ok(Some(adjusted_zero_index))
} else {
// Out of bounds
Ok(None)
}
}
let mut offsets = vec![O::usize_as(0)];
let mut null_builder = NullBufferBuilder::new(array.len());
for (row_index, offset_window) in array.offsets().windows(2).enumerate() {
let start = offset_window[0];
let end = offset_window[1];
let len = end - start;
// If any input is null, return null.
if array.is_null(row_index)
|| from_array.is_null(row_index)
|| to_array.is_null(row_index)
{
mutable.extend_nulls(1);
offsets.push(offsets[row_index] + O::usize_as(1));
null_builder.append_null();
continue;
}
null_builder.append_non_null();
// Empty arrays always return an empty array.
if len == O::usize_as(0) {
offsets.push(offsets[row_index]);
continue;
}
let from_index = adjusted_from_index::<O>(from_array.value(row_index), len)?;
let to_index = adjusted_to_index::<O>(to_array.value(row_index), len)?;
if let (Some(from), Some(to)) = (from_index, to_index) {
let stride = stride.map(|s| s.value(row_index));
// Default stride is 1 if not provided
let stride = stride.unwrap_or(1);
if stride.is_zero() {
return exec_err!(
"array_slice got invalid stride: {:?}, it cannot be 0",
stride
);
} else if (from < to && stride.is_negative())
|| (from > to && stride.is_positive())
{
// return empty array
offsets.push(offsets[row_index]);
continue;
}
let stride: O = stride.try_into().map_err(|_| {
internal_datafusion_err!("array_slice got invalid stride: {}", stride)
})?;
if from <= to && stride > O::zero() {
assert!(start + to <= end);
if stride.eq(&O::one()) {
// stride is default to 1
mutable.extend(
0,
(start + from).to_usize().unwrap(),
(start + to + O::usize_as(1)).to_usize().unwrap(),
);
offsets.push(offsets[row_index] + (to - from + O::usize_as(1)));
continue;
}
let mut index = start + from;
let mut cnt = 0;
while index <= start + to {
mutable.extend(
0,
index.to_usize().unwrap(),
index.to_usize().unwrap() + 1,
);
index += stride;
cnt += 1;
}
offsets.push(offsets[row_index] + O::usize_as(cnt));
} else {
let mut index = start + from;
let mut cnt = 0;
while index >= start + to {
mutable.extend(
0,
index.to_usize().unwrap(),
index.to_usize().unwrap() + 1,
);
index += stride;
cnt += 1;
}
// invalid range, return empty array
offsets.push(offsets[row_index] + O::usize_as(cnt));
}
} else {
// invalid range, return empty array
offsets.push(offsets[row_index]);
}
}
let data = mutable.freeze();
Ok(Arc::new(GenericListArray::<O>::try_new(
Arc::new(Field::new_list_field(array.value_type(), true)),
OffsetBuffer::<O>::new(offsets.into()),
arrow::array::make_array(data),
null_builder.finish(),
)?))
}