in arrow-select/src/take.rs [190:347]
fn take_impl<IndexType: ArrowPrimitiveType>(
values: &dyn Array,
indices: &PrimitiveArray<IndexType>,
) -> Result<ArrayRef, ArrowError> {
downcast_primitive_array! {
values => Ok(Arc::new(take_primitive(values, indices)?)),
DataType::Boolean => {
let values = values.as_any().downcast_ref::<BooleanArray>().unwrap();
Ok(Arc::new(take_boolean(values, indices)))
}
DataType::Utf8 => {
Ok(Arc::new(take_bytes(values.as_string::<i32>(), indices)?))
}
DataType::LargeUtf8 => {
Ok(Arc::new(take_bytes(values.as_string::<i64>(), indices)?))
}
DataType::Utf8View => {
Ok(Arc::new(take_byte_view(values.as_string_view(), indices)?))
}
DataType::List(_) => {
Ok(Arc::new(take_list::<_, Int32Type>(values.as_list(), indices)?))
}
DataType::LargeList(_) => {
Ok(Arc::new(take_list::<_, Int64Type>(values.as_list(), indices)?))
}
DataType::FixedSizeList(_, length) => {
let values = values
.as_any()
.downcast_ref::<FixedSizeListArray>()
.unwrap();
Ok(Arc::new(take_fixed_size_list(
values,
indices,
*length as u32,
)?))
}
DataType::Map(_, _) => {
let list_arr = ListArray::from(values.as_map().clone());
let list_data = take_list::<_, Int32Type>(&list_arr, indices)?;
let builder = list_data.into_data().into_builder().data_type(values.data_type().clone());
Ok(Arc::new(MapArray::from(unsafe { builder.build_unchecked() })))
}
DataType::Struct(fields) => {
let array: &StructArray = values.as_struct();
let arrays = array
.columns()
.iter()
.map(|a| take_impl(a.as_ref(), indices))
.collect::<Result<Vec<ArrayRef>, _>>()?;
let fields: Vec<(FieldRef, ArrayRef)> =
fields.iter().cloned().zip(arrays).collect();
// Create the null bit buffer.
let is_valid: Buffer = indices
.iter()
.map(|index| {
if let Some(index) = index {
array.is_valid(index.to_usize().unwrap())
} else {
false
}
})
.collect();
if fields.is_empty() {
let nulls = NullBuffer::new(BooleanBuffer::new(is_valid, 0, indices.len()));
Ok(Arc::new(StructArray::new_empty_fields(indices.len(), Some(nulls))))
} else {
Ok(Arc::new(StructArray::from((fields, is_valid))) as ArrayRef)
}
}
DataType::Dictionary(_, _) => downcast_dictionary_array! {
values => Ok(Arc::new(take_dict(values, indices)?)),
t => unimplemented!("Take not supported for dictionary type {:?}", t)
}
DataType::RunEndEncoded(_, _) => downcast_run_array! {
values => Ok(Arc::new(take_run(values, indices)?)),
t => unimplemented!("Take not supported for run type {:?}", t)
}
DataType::Binary => {
Ok(Arc::new(take_bytes(values.as_binary::<i32>(), indices)?))
}
DataType::LargeBinary => {
Ok(Arc::new(take_bytes(values.as_binary::<i64>(), indices)?))
}
DataType::BinaryView => {
Ok(Arc::new(take_byte_view(values.as_binary_view(), indices)?))
}
DataType::FixedSizeBinary(size) => {
let values = values
.as_any()
.downcast_ref::<FixedSizeBinaryArray>()
.unwrap();
Ok(Arc::new(take_fixed_size_binary(values, indices, *size)?))
}
DataType::Null => {
// Take applied to a null array produces a null array.
if values.len() >= indices.len() {
// If the existing null array is as big as the indices, we can use a slice of it
// to avoid allocating a new null array.
Ok(values.slice(0, indices.len()))
} else {
// If the existing null array isn't big enough, create a new one.
Ok(new_null_array(&DataType::Null, indices.len()))
}
}
DataType::Union(fields, UnionMode::Sparse) => {
let mut children = Vec::with_capacity(fields.len());
let values = values.as_any().downcast_ref::<UnionArray>().unwrap();
let type_ids = take_native(values.type_ids(), indices);
for (type_id, _field) in fields.iter() {
let values = values.child(type_id);
let values = take_impl(values, indices)?;
children.push(values);
}
let array = UnionArray::try_new(fields.clone(), type_ids, None, children)?;
Ok(Arc::new(array))
}
DataType::Union(fields, UnionMode::Dense) => {
let values = values.as_any().downcast_ref::<UnionArray>().unwrap();
let type_ids = <PrimitiveArray<Int8Type>>::new(take_native(values.type_ids(), indices), None);
let offsets = <PrimitiveArray<Int32Type>>::new(take_native(values.offsets().unwrap(), indices), None);
let children = fields.iter()
.map(|(field_type_id, _)| {
let mask = BooleanArray::from_unary(&type_ids, |value_type_id| value_type_id == field_type_id);
let indices = crate::filter::filter(&offsets, &mask)?;
let values = values.child(field_type_id);
take_impl(values, indices.as_primitive::<Int32Type>())
})
.collect::<Result<_, _>>()?;
let mut child_offsets = [0; 128];
let offsets = type_ids.values()
.iter()
.map(|&i| {
let offset = child_offsets[i as usize];
child_offsets[i as usize] += 1;
offset
})
.collect();
let (_, type_ids, _) = type_ids.into_parts();
let array = UnionArray::try_new(fields.clone(), type_ids, Some(offsets), children)?;
Ok(Arc::new(array))
}
t => unimplemented!("Take not supported for data type {:?}", t)
}
}