fn create_scalar_function_expr()

in native/core/src/execution/planner.rs [2160:2250]


    fn create_scalar_function_expr(
        &self,
        expr: &ScalarFunc,
        input_schema: SchemaRef,
    ) -> Result<Arc<dyn PhysicalExpr>, ExecutionError> {
        let args = expr
            .args
            .iter()
            .map(|x| self.create_expr(x, Arc::clone(&input_schema)))
            .collect::<Result<Vec<_>, _>>()?;

        let fun_name = &expr.func;
        let input_expr_types = args
            .iter()
            .map(|x| x.data_type(input_schema.as_ref()))
            .collect::<Result<Vec<_>, _>>()?;

        let (data_type, coerced_input_types) =
            match expr.return_type.as_ref().map(to_arrow_datatype) {
                Some(t) => (t, input_expr_types.clone()),
                None => {
                    let fun_name = match fun_name.as_ref() {
                        "read_side_padding" => "rpad", // use the same return type as rpad
                        other => other,
                    };
                    let func = self.session_ctx.udf(fun_name)?;

                    let coerced_types = func
                        .coerce_types(&input_expr_types)
                        .unwrap_or_else(|_| input_expr_types.clone());

                    // TODO this should try and find scalar
                    let arguments = args
                        .iter()
                        .map(|e| {
                            e.as_ref()
                                .as_any()
                                .downcast_ref::<Literal>()
                                .map(|lit| lit.value())
                        })
                        .collect::<Vec<_>>();

                    let nullables = arguments.iter().map(|_| true).collect::<Vec<_>>();

                    let args = ReturnTypeArgs {
                        arg_types: &coerced_types,
                        scalar_arguments: &arguments,
                        nullables: &nullables,
                    };

                    let data_type = func
                        .inner()
                        .return_type_from_args(args)?
                        .return_type()
                        .clone();

                    (data_type, coerced_types)
                }
            };

        let fun_expr =
            create_comet_physical_fun(fun_name, data_type.clone(), &self.session_ctx.state())?;

        let args = args
            .into_iter()
            .zip(input_expr_types.into_iter().zip(coerced_input_types))
            .map(|(expr, (from_type, to_type))| {
                if from_type != to_type {
                    Arc::new(CastExpr::new(
                        expr,
                        to_type,
                        Some(CastOptions {
                            safe: false,
                            ..Default::default()
                        }),
                    ))
                } else {
                    expr
                }
            })
            .collect::<Vec<_>>();

        let scalar_expr: Arc<dyn PhysicalExpr> = Arc::new(ScalarFunctionExpr::new(
            fun_name,
            fun_expr,
            args.to_vec(),
            data_type,
        ));

        Ok(scalar_expr)
    }