in datafusion/functions/src/string/concat_ws.rs [105:286]
fn invoke_with_args(&self, args: ScalarFunctionArgs) -> Result<ColumnarValue> {
let ScalarFunctionArgs { args, .. } = args;
// do not accept 0 arguments.
if args.len() < 2 {
return exec_err!(
"concat_ws was called with {} arguments. It requires at least 2.",
args.len()
);
}
let array_len = args
.iter()
.filter_map(|x| match x {
ColumnarValue::Array(array) => Some(array.len()),
_ => None,
})
.next();
// Scalar
if array_len.is_none() {
let ColumnarValue::Scalar(scalar) = &args[0] else {
// loop above checks for all args being scalar
unreachable!()
};
let sep = match scalar.try_as_str() {
Some(Some(s)) => s,
Some(None) => {
// null literal string
return Ok(ColumnarValue::Scalar(ScalarValue::Utf8(None)));
}
None => return internal_err!("Expected string literal, got {scalar:?}"),
};
let mut result = String::new();
// iterator over Option<str>
let iter = &mut args[1..].iter().map(|arg| {
let ColumnarValue::Scalar(scalar) = arg else {
// loop above checks for all args being scalar
unreachable!()
};
scalar.try_as_str()
});
// append first non null arg
for scalar in iter.by_ref() {
match scalar {
Some(Some(s)) => {
result.push_str(s);
break;
}
Some(None) => {} // null literal string
None => {
return internal_err!("Expected string literal, got {scalar:?}")
}
}
}
// handle subsequent non null args
for scalar in iter.by_ref() {
match scalar {
Some(Some(s)) => {
result.push_str(sep);
result.push_str(s);
}
Some(None) => {} // null literal string
None => {
return internal_err!("Expected string literal, got {scalar:?}")
}
}
}
return Ok(ColumnarValue::Scalar(ScalarValue::Utf8(Some(result))));
}
// Array
let len = array_len.unwrap();
let mut data_size = 0;
// parse sep
let sep = match &args[0] {
ColumnarValue::Scalar(ScalarValue::Utf8(Some(s))) => {
data_size += s.len() * len * (args.len() - 2); // estimate
ColumnarValueRef::Scalar(s.as_bytes())
}
ColumnarValue::Scalar(ScalarValue::Utf8(None)) => {
return Ok(ColumnarValue::Array(Arc::new(StringArray::new_null(len))));
}
ColumnarValue::Array(array) => {
let string_array = as_string_array(array)?;
data_size += string_array.values().len() * (args.len() - 2); // estimate
if array.is_nullable() {
ColumnarValueRef::NullableArray(string_array)
} else {
ColumnarValueRef::NonNullableArray(string_array)
}
}
_ => unreachable!("concat ws"),
};
let mut columns = Vec::with_capacity(args.len() - 1);
for arg in &args[1..] {
match arg {
ColumnarValue::Scalar(ScalarValue::Utf8(maybe_value))
| ColumnarValue::Scalar(ScalarValue::LargeUtf8(maybe_value))
| ColumnarValue::Scalar(ScalarValue::Utf8View(maybe_value)) => {
if let Some(s) = maybe_value {
data_size += s.len() * len;
columns.push(ColumnarValueRef::Scalar(s.as_bytes()));
}
}
ColumnarValue::Array(array) => {
match array.data_type() {
DataType::Utf8 => {
let string_array = as_string_array(array)?;
data_size += string_array.values().len();
let column = if array.is_nullable() {
ColumnarValueRef::NullableArray(string_array)
} else {
ColumnarValueRef::NonNullableArray(string_array)
};
columns.push(column);
},
DataType::LargeUtf8 => {
let string_array = as_largestring_array(array);
data_size += string_array.values().len();
let column = if array.is_nullable() {
ColumnarValueRef::NullableLargeStringArray(string_array)
} else {
ColumnarValueRef::NonNullableLargeStringArray(string_array)
};
columns.push(column);
},
DataType::Utf8View => {
let string_array = as_string_view_array(array)?;
data_size += string_array.data_buffers().iter().map(|buf| buf.len()).sum::<usize>();
let column = if array.is_nullable() {
ColumnarValueRef::NullableStringViewArray(string_array)
} else {
ColumnarValueRef::NonNullableStringViewArray(string_array)
};
columns.push(column);
},
other => {
return plan_err!("Input was {other} which is not a supported datatype for concat_ws function.")
}
};
}
_ => unreachable!(),
}
}
let mut builder = StringArrayBuilder::with_capacity(len, data_size);
for i in 0..len {
if !sep.is_valid(i) {
builder.append_offset();
continue;
}
let mut iter = columns.iter();
for column in iter.by_ref() {
if column.is_valid(i) {
builder.write::<false>(column, i);
break;
}
}
for column in iter {
if column.is_valid(i) {
builder.write::<false>(&sep, i);
builder.write::<false>(column, i);
}
}
builder.append_offset();
}
Ok(ColumnarValue::Array(Arc::new(builder.finish(sep.nulls()))))
}