fn take_impl()

in arrow-select/src/take.rs [190:347]


fn take_impl<IndexType: ArrowPrimitiveType>(
    values: &dyn Array,
    indices: &PrimitiveArray<IndexType>,
) -> Result<ArrayRef, ArrowError> {
    downcast_primitive_array! {
        values => Ok(Arc::new(take_primitive(values, indices)?)),
        DataType::Boolean => {
            let values = values.as_any().downcast_ref::<BooleanArray>().unwrap();
            Ok(Arc::new(take_boolean(values, indices)))
        }
        DataType::Utf8 => {
            Ok(Arc::new(take_bytes(values.as_string::<i32>(), indices)?))
        }
        DataType::LargeUtf8 => {
            Ok(Arc::new(take_bytes(values.as_string::<i64>(), indices)?))
        }
        DataType::Utf8View => {
            Ok(Arc::new(take_byte_view(values.as_string_view(), indices)?))
        }
        DataType::List(_) => {
            Ok(Arc::new(take_list::<_, Int32Type>(values.as_list(), indices)?))
        }
        DataType::LargeList(_) => {
            Ok(Arc::new(take_list::<_, Int64Type>(values.as_list(), indices)?))
        }
        DataType::FixedSizeList(_, length) => {
            let values = values
                .as_any()
                .downcast_ref::<FixedSizeListArray>()
                .unwrap();
            Ok(Arc::new(take_fixed_size_list(
                values,
                indices,
                *length as u32,
            )?))
        }
        DataType::Map(_, _) => {
            let list_arr = ListArray::from(values.as_map().clone());
            let list_data = take_list::<_, Int32Type>(&list_arr, indices)?;
            let builder = list_data.into_data().into_builder().data_type(values.data_type().clone());
            Ok(Arc::new(MapArray::from(unsafe { builder.build_unchecked() })))
        }
        DataType::Struct(fields) => {
            let array: &StructArray = values.as_struct();
            let arrays  = array
                .columns()
                .iter()
                .map(|a| take_impl(a.as_ref(), indices))
                .collect::<Result<Vec<ArrayRef>, _>>()?;
            let fields: Vec<(FieldRef, ArrayRef)> =
                fields.iter().cloned().zip(arrays).collect();

            // Create the null bit buffer.
            let is_valid: Buffer = indices
                .iter()
                .map(|index| {
                    if let Some(index) = index {
                        array.is_valid(index.to_usize().unwrap())
                    } else {
                        false
                    }
                })
                .collect();

            if fields.is_empty() {
                let nulls = NullBuffer::new(BooleanBuffer::new(is_valid, 0, indices.len()));
                Ok(Arc::new(StructArray::new_empty_fields(indices.len(), Some(nulls))))
            } else {
                Ok(Arc::new(StructArray::from((fields, is_valid))) as ArrayRef)
            }
        }
        DataType::Dictionary(_, _) => downcast_dictionary_array! {
            values => Ok(Arc::new(take_dict(values, indices)?)),
            t => unimplemented!("Take not supported for dictionary type {:?}", t)
        }
        DataType::RunEndEncoded(_, _) => downcast_run_array! {
            values => Ok(Arc::new(take_run(values, indices)?)),
            t => unimplemented!("Take not supported for run type {:?}", t)
        }
        DataType::Binary => {
            Ok(Arc::new(take_bytes(values.as_binary::<i32>(), indices)?))
        }
        DataType::LargeBinary => {
            Ok(Arc::new(take_bytes(values.as_binary::<i64>(), indices)?))
        }
        DataType::BinaryView => {
            Ok(Arc::new(take_byte_view(values.as_binary_view(), indices)?))
        }
        DataType::FixedSizeBinary(size) => {
            let values = values
                .as_any()
                .downcast_ref::<FixedSizeBinaryArray>()
                .unwrap();
            Ok(Arc::new(take_fixed_size_binary(values, indices, *size)?))
        }
        DataType::Null => {
            // Take applied to a null array produces a null array.
            if values.len() >= indices.len() {
                // If the existing null array is as big as the indices, we can use a slice of it
                // to avoid allocating a new null array.
                Ok(values.slice(0, indices.len()))
            } else {
                // If the existing null array isn't big enough, create a new one.
                Ok(new_null_array(&DataType::Null, indices.len()))
            }
        }
        DataType::Union(fields, UnionMode::Sparse) => {
            let mut children = Vec::with_capacity(fields.len());
            let values = values.as_any().downcast_ref::<UnionArray>().unwrap();
            let type_ids = take_native(values.type_ids(), indices);
            for (type_id, _field) in fields.iter() {
                let values = values.child(type_id);
                let values = take_impl(values, indices)?;
                children.push(values);
            }
            let array = UnionArray::try_new(fields.clone(), type_ids, None, children)?;
            Ok(Arc::new(array))
        }
        DataType::Union(fields, UnionMode::Dense) => {
            let values = values.as_any().downcast_ref::<UnionArray>().unwrap();

            let type_ids = <PrimitiveArray<Int8Type>>::new(take_native(values.type_ids(), indices), None);
            let offsets = <PrimitiveArray<Int32Type>>::new(take_native(values.offsets().unwrap(), indices), None);

            let children = fields.iter()
                .map(|(field_type_id, _)| {
                    let mask = BooleanArray::from_unary(&type_ids, |value_type_id| value_type_id == field_type_id);

                    let indices = crate::filter::filter(&offsets, &mask)?;

                    let values = values.child(field_type_id);

                    take_impl(values, indices.as_primitive::<Int32Type>())
                })
                .collect::<Result<_, _>>()?;

            let mut child_offsets = [0; 128];

            let offsets = type_ids.values()
                .iter()
                .map(|&i| {
                    let offset = child_offsets[i as usize];

                    child_offsets[i as usize] += 1;

                    offset
                })
                .collect();

            let (_, type_ids, _) = type_ids.into_parts();

            let array = UnionArray::try_new(fields.clone(), type_ids, Some(offsets), children)?;

            Ok(Arc::new(array))
        }
        t => unimplemented!("Take not supported for data type {:?}", t)
    }
}