fn invoke_with_args()

in datafusion/functions/src/string/concat_ws.rs [105:286]


    fn invoke_with_args(&self, args: ScalarFunctionArgs) -> Result<ColumnarValue> {
        let ScalarFunctionArgs { args, .. } = args;

        // do not accept 0 arguments.
        if args.len() < 2 {
            return exec_err!(
                "concat_ws was called with {} arguments. It requires at least 2.",
                args.len()
            );
        }

        let array_len = args
            .iter()
            .filter_map(|x| match x {
                ColumnarValue::Array(array) => Some(array.len()),
                _ => None,
            })
            .next();

        // Scalar
        if array_len.is_none() {
            let ColumnarValue::Scalar(scalar) = &args[0] else {
                // loop above checks for all args being scalar
                unreachable!()
            };
            let sep = match scalar.try_as_str() {
                Some(Some(s)) => s,
                Some(None) => {
                    // null literal string
                    return Ok(ColumnarValue::Scalar(ScalarValue::Utf8(None)));
                }
                None => return internal_err!("Expected string literal, got {scalar:?}"),
            };

            let mut result = String::new();
            // iterator over Option<str>
            let iter = &mut args[1..].iter().map(|arg| {
                let ColumnarValue::Scalar(scalar) = arg else {
                    // loop above checks for all args being scalar
                    unreachable!()
                };
                scalar.try_as_str()
            });

            // append first non null arg
            for scalar in iter.by_ref() {
                match scalar {
                    Some(Some(s)) => {
                        result.push_str(s);
                        break;
                    }
                    Some(None) => {} // null literal string
                    None => {
                        return internal_err!("Expected string literal, got {scalar:?}")
                    }
                }
            }

            // handle subsequent non null args
            for scalar in iter.by_ref() {
                match scalar {
                    Some(Some(s)) => {
                        result.push_str(sep);
                        result.push_str(s);
                    }
                    Some(None) => {} // null literal string
                    None => {
                        return internal_err!("Expected string literal, got {scalar:?}")
                    }
                }
            }

            return Ok(ColumnarValue::Scalar(ScalarValue::Utf8(Some(result))));
        }

        // Array
        let len = array_len.unwrap();
        let mut data_size = 0;

        // parse sep
        let sep = match &args[0] {
            ColumnarValue::Scalar(ScalarValue::Utf8(Some(s))) => {
                data_size += s.len() * len * (args.len() - 2); // estimate
                ColumnarValueRef::Scalar(s.as_bytes())
            }
            ColumnarValue::Scalar(ScalarValue::Utf8(None)) => {
                return Ok(ColumnarValue::Array(Arc::new(StringArray::new_null(len))));
            }
            ColumnarValue::Array(array) => {
                let string_array = as_string_array(array)?;
                data_size += string_array.values().len() * (args.len() - 2); // estimate
                if array.is_nullable() {
                    ColumnarValueRef::NullableArray(string_array)
                } else {
                    ColumnarValueRef::NonNullableArray(string_array)
                }
            }
            _ => unreachable!("concat ws"),
        };

        let mut columns = Vec::with_capacity(args.len() - 1);
        for arg in &args[1..] {
            match arg {
                ColumnarValue::Scalar(ScalarValue::Utf8(maybe_value))
                | ColumnarValue::Scalar(ScalarValue::LargeUtf8(maybe_value))
                | ColumnarValue::Scalar(ScalarValue::Utf8View(maybe_value)) => {
                    if let Some(s) = maybe_value {
                        data_size += s.len() * len;
                        columns.push(ColumnarValueRef::Scalar(s.as_bytes()));
                    }
                }
                ColumnarValue::Array(array) => {
                    match array.data_type() {
                        DataType::Utf8 => {
                            let string_array = as_string_array(array)?;

                            data_size += string_array.values().len();
                            let column = if array.is_nullable() {
                                ColumnarValueRef::NullableArray(string_array)
                            } else {
                                ColumnarValueRef::NonNullableArray(string_array)
                            };
                            columns.push(column);
                        },
                        DataType::LargeUtf8 => {
                            let string_array = as_largestring_array(array);

                            data_size += string_array.values().len();
                            let column = if array.is_nullable() {
                                ColumnarValueRef::NullableLargeStringArray(string_array)
                            } else {
                                ColumnarValueRef::NonNullableLargeStringArray(string_array)
                            };
                            columns.push(column);
                        },
                        DataType::Utf8View => {
                            let string_array = as_string_view_array(array)?;

                            data_size += string_array.data_buffers().iter().map(|buf| buf.len()).sum::<usize>();
                            let column = if array.is_nullable() {
                                ColumnarValueRef::NullableStringViewArray(string_array)
                            } else {
                                ColumnarValueRef::NonNullableStringViewArray(string_array)
                            };
                            columns.push(column);
                        },
                        other => {
                            return plan_err!("Input was {other} which is not a supported datatype for concat_ws function.")
                        }
                    };
                }
                _ => unreachable!(),
            }
        }

        let mut builder = StringArrayBuilder::with_capacity(len, data_size);
        for i in 0..len {
            if !sep.is_valid(i) {
                builder.append_offset();
                continue;
            }

            let mut iter = columns.iter();
            for column in iter.by_ref() {
                if column.is_valid(i) {
                    builder.write::<false>(column, i);
                    break;
                }
            }

            for column in iter {
                if column.is_valid(i) {
                    builder.write::<false>(&sep, i);
                    builder.write::<false>(column, i);
                }
            }

            builder.append_offset();
        }

        Ok(ColumnarValue::Array(Arc::new(builder.finish(sep.nulls()))))
    }