in native/core/src/execution/planner.rs [2160:2250]
fn create_scalar_function_expr(
&self,
expr: &ScalarFunc,
input_schema: SchemaRef,
) -> Result<Arc<dyn PhysicalExpr>, ExecutionError> {
let args = expr
.args
.iter()
.map(|x| self.create_expr(x, Arc::clone(&input_schema)))
.collect::<Result<Vec<_>, _>>()?;
let fun_name = &expr.func;
let input_expr_types = args
.iter()
.map(|x| x.data_type(input_schema.as_ref()))
.collect::<Result<Vec<_>, _>>()?;
let (data_type, coerced_input_types) =
match expr.return_type.as_ref().map(to_arrow_datatype) {
Some(t) => (t, input_expr_types.clone()),
None => {
let fun_name = match fun_name.as_ref() {
"read_side_padding" => "rpad", // use the same return type as rpad
other => other,
};
let func = self.session_ctx.udf(fun_name)?;
let coerced_types = func
.coerce_types(&input_expr_types)
.unwrap_or_else(|_| input_expr_types.clone());
// TODO this should try and find scalar
let arguments = args
.iter()
.map(|e| {
e.as_ref()
.as_any()
.downcast_ref::<Literal>()
.map(|lit| lit.value())
})
.collect::<Vec<_>>();
let nullables = arguments.iter().map(|_| true).collect::<Vec<_>>();
let args = ReturnTypeArgs {
arg_types: &coerced_types,
scalar_arguments: &arguments,
nullables: &nullables,
};
let data_type = func
.inner()
.return_type_from_args(args)?
.return_type()
.clone();
(data_type, coerced_types)
}
};
let fun_expr =
create_comet_physical_fun(fun_name, data_type.clone(), &self.session_ctx.state())?;
let args = args
.into_iter()
.zip(input_expr_types.into_iter().zip(coerced_input_types))
.map(|(expr, (from_type, to_type))| {
if from_type != to_type {
Arc::new(CastExpr::new(
expr,
to_type,
Some(CastOptions {
safe: false,
..Default::default()
}),
))
} else {
expr
}
})
.collect::<Vec<_>>();
let scalar_expr: Arc<dyn PhysicalExpr> = Arc::new(ScalarFunctionExpr::new(
fun_name,
fun_expr,
args.to_vec(),
data_type,
));
Ok(scalar_expr)
}