fn sql_statement_to_plan_with_context_impl()

in datafusion/sql/src/statement.rs [211:1265]


    fn sql_statement_to_plan_with_context_impl(
        &self,
        statement: Statement,
        planner_context: &mut PlannerContext,
    ) -> Result<LogicalPlan> {
        match statement {
            Statement::ExplainTable {
                describe_alias: DescribeAlias::Describe, // only parse 'DESCRIBE table_name' and not 'EXPLAIN table_name'
                table_name,
                ..
            } => self.describe_table_to_plan(table_name),
            Statement::Explain {
                verbose,
                statement,
                analyze,
                format,
                describe_alias: _,
                ..
            } => {
                let format = format.map(|format| format.to_string());
                let statement = DFStatement::Statement(statement);
                self.explain_to_plan(verbose, analyze, format, statement)
            }
            Statement::Query(query) => self.query_to_plan(*query, planner_context),
            Statement::ShowVariable { variable } => self.show_variable_to_plan(&variable),
            Statement::SetVariable {
                local,
                hivevar,
                variables,
                value,
            } => self.set_variable_to_plan(local, hivevar, &variables, value),

            Statement::CreateTable(CreateTable {
                temporary,
                external,
                global,
                transient,
                volatile,
                hive_distribution,
                hive_formats,
                file_format,
                location,
                query,
                name,
                columns,
                constraints,
                table_properties,
                with_options,
                if_not_exists,
                or_replace,
                without_rowid,
                like,
                clone,
                engine,
                comment,
                auto_increment_offset,
                default_charset,
                collation,
                on_commit,
                on_cluster,
                primary_key,
                order_by,
                partition_by,
                cluster_by,
                clustered_by,
                options,
                strict,
                copy_grants,
                enable_schema_evolution,
                change_tracking,
                data_retention_time_in_days,
                max_data_extension_time_in_days,
                default_ddl_collation,
                with_aggregation_policy,
                with_row_access_policy,
                with_tags,
                iceberg,
                external_volume,
                base_location,
                catalog,
                catalog_sync,
                storage_serialization_policy,
            }) if table_properties.is_empty() && with_options.is_empty() => {
                if temporary {
                    return not_impl_err!("Temporary tables not supported")?;
                }
                if external {
                    return not_impl_err!("External tables not supported")?;
                }
                if global.is_some() {
                    return not_impl_err!("Global tables not supported")?;
                }
                if transient {
                    return not_impl_err!("Transient tables not supported")?;
                }
                if volatile {
                    return not_impl_err!("Volatile tables not supported")?;
                }
                if hive_distribution != ast::HiveDistributionStyle::NONE {
                    return not_impl_err!(
                        "Hive distribution not supported: {hive_distribution:?}"
                    )?;
                }
                if !matches!(
                    hive_formats,
                    Some(ast::HiveFormat {
                        row_format: None,
                        serde_properties: None,
                        storage: None,
                        location: None,
                    })
                ) {
                    return not_impl_err!(
                        "Hive formats not supported: {hive_formats:?}"
                    )?;
                }
                if file_format.is_some() {
                    return not_impl_err!("File format not supported")?;
                }
                if location.is_some() {
                    return not_impl_err!("Location not supported")?;
                }
                if without_rowid {
                    return not_impl_err!("Without rowid not supported")?;
                }
                if like.is_some() {
                    return not_impl_err!("Like not supported")?;
                }
                if clone.is_some() {
                    return not_impl_err!("Clone not supported")?;
                }
                if engine.is_some() {
                    return not_impl_err!("Engine not supported")?;
                }
                if comment.is_some() {
                    return not_impl_err!("Comment not supported")?;
                }
                if auto_increment_offset.is_some() {
                    return not_impl_err!("Auto increment offset not supported")?;
                }
                if default_charset.is_some() {
                    return not_impl_err!("Default charset not supported")?;
                }
                if collation.is_some() {
                    return not_impl_err!("Collation not supported")?;
                }
                if on_commit.is_some() {
                    return not_impl_err!("On commit not supported")?;
                }
                if on_cluster.is_some() {
                    return not_impl_err!("On cluster not supported")?;
                }
                if primary_key.is_some() {
                    return not_impl_err!("Primary key not supported")?;
                }
                if order_by.is_some() {
                    return not_impl_err!("Order by not supported")?;
                }
                if partition_by.is_some() {
                    return not_impl_err!("Partition by not supported")?;
                }
                if cluster_by.is_some() {
                    return not_impl_err!("Cluster by not supported")?;
                }
                if clustered_by.is_some() {
                    return not_impl_err!("Clustered by not supported")?;
                }
                if options.is_some() {
                    return not_impl_err!("Options not supported")?;
                }
                if strict {
                    return not_impl_err!("Strict not supported")?;
                }
                if copy_grants {
                    return not_impl_err!("Copy grants not supported")?;
                }
                if enable_schema_evolution.is_some() {
                    return not_impl_err!("Enable schema evolution not supported")?;
                }
                if change_tracking.is_some() {
                    return not_impl_err!("Change tracking not supported")?;
                }
                if data_retention_time_in_days.is_some() {
                    return not_impl_err!("Data retention time in days not supported")?;
                }
                if max_data_extension_time_in_days.is_some() {
                    return not_impl_err!(
                        "Max data extension time in days not supported"
                    )?;
                }
                if default_ddl_collation.is_some() {
                    return not_impl_err!("Default DDL collation not supported")?;
                }
                if with_aggregation_policy.is_some() {
                    return not_impl_err!("With aggregation policy not supported")?;
                }
                if with_row_access_policy.is_some() {
                    return not_impl_err!("With row access policy not supported")?;
                }
                if with_tags.is_some() {
                    return not_impl_err!("With tags not supported")?;
                }
                if iceberg {
                    return not_impl_err!("Iceberg not supported")?;
                }
                if external_volume.is_some() {
                    return not_impl_err!("External volume not supported")?;
                }
                if base_location.is_some() {
                    return not_impl_err!("Base location not supported")?;
                }
                if catalog.is_some() {
                    return not_impl_err!("Catalog not supported")?;
                }
                if catalog_sync.is_some() {
                    return not_impl_err!("Catalog sync not supported")?;
                }
                if storage_serialization_policy.is_some() {
                    return not_impl_err!("Storage serialization policy not supported")?;
                }

                // Merge inline constraints and existing constraints
                let mut all_constraints = constraints;
                let inline_constraints = calc_inline_constraints_from_columns(&columns);
                all_constraints.extend(inline_constraints);
                // Build column default values
                let column_defaults =
                    self.build_column_defaults(&columns, planner_context)?;

                let has_columns = !columns.is_empty();
                let schema = self.build_schema(columns)?.to_dfschema_ref()?;
                if has_columns {
                    planner_context.set_table_schema(Some(Arc::clone(&schema)));
                }

                match query {
                    Some(query) => {
                        let plan = self.query_to_plan(*query, planner_context)?;
                        let input_schema = plan.schema();

                        let plan = if has_columns {
                            if schema.fields().len() != input_schema.fields().len() {
                                return plan_err!(
                            "Mismatch: {} columns specified, but result has {} columns",
                            schema.fields().len(),
                            input_schema.fields().len()
                        );
                            }
                            let input_fields = input_schema.fields();
                            let project_exprs = schema
                                .fields()
                                .iter()
                                .zip(input_fields)
                                .map(|(field, input_field)| {
                                    cast(
                                        col(input_field.name()),
                                        field.data_type().clone(),
                                    )
                                    .alias(field.name())
                                })
                                .collect::<Vec<_>>();

                            LogicalPlanBuilder::from(plan.clone())
                                .project(project_exprs)?
                                .build()?
                        } else {
                            plan
                        };

                        let constraints = self.new_constraint_from_table_constraints(
                            &all_constraints,
                            plan.schema(),
                        )?;

                        Ok(LogicalPlan::Ddl(DdlStatement::CreateMemoryTable(
                            CreateMemoryTable {
                                name: self.object_name_to_table_reference(name)?,
                                constraints,
                                input: Arc::new(plan),
                                if_not_exists,
                                or_replace,
                                column_defaults,
                                temporary,
                            },
                        )))
                    }

                    None => {
                        let plan = EmptyRelation {
                            produce_one_row: false,
                            schema,
                        };
                        let plan = LogicalPlan::EmptyRelation(plan);
                        let constraints = self.new_constraint_from_table_constraints(
                            &all_constraints,
                            plan.schema(),
                        )?;
                        Ok(LogicalPlan::Ddl(DdlStatement::CreateMemoryTable(
                            CreateMemoryTable {
                                name: self.object_name_to_table_reference(name)?,
                                constraints,
                                input: Arc::new(plan),
                                if_not_exists,
                                or_replace,
                                column_defaults,
                                temporary,
                            },
                        )))
                    }
                }
            }

            Statement::CreateView {
                or_replace,
                materialized,
                name,
                columns,
                query,
                options: CreateTableOptions::None,
                cluster_by,
                comment,
                with_no_schema_binding,
                if_not_exists,
                temporary,
                to,
                params,
            } => {
                if materialized {
                    return not_impl_err!("Materialized views not supported")?;
                }
                if !cluster_by.is_empty() {
                    return not_impl_err!("Cluster by not supported")?;
                }
                if comment.is_some() {
                    return not_impl_err!("Comment not supported")?;
                }
                if with_no_schema_binding {
                    return not_impl_err!("With no schema binding not supported")?;
                }
                if if_not_exists {
                    return not_impl_err!("If not exists not supported")?;
                }
                if to.is_some() {
                    return not_impl_err!("To not supported")?;
                }

                // put the statement back together temporarily to get the SQL
                // string representation
                let stmt = Statement::CreateView {
                    or_replace,
                    materialized,
                    name,
                    columns,
                    query,
                    options: CreateTableOptions::None,
                    cluster_by,
                    comment,
                    with_no_schema_binding,
                    if_not_exists,
                    temporary,
                    to,
                    params,
                };
                let sql = stmt.to_string();
                let Statement::CreateView {
                    name,
                    columns,
                    query,
                    or_replace,
                    temporary,
                    ..
                } = stmt
                else {
                    return internal_err!("Unreachable code in create view");
                };

                let columns = columns
                    .into_iter()
                    .map(|view_column_def| {
                        if let Some(options) = view_column_def.options {
                            plan_err!(
                                "Options not supported for view columns: {options:?}"
                            )
                        } else {
                            Ok(view_column_def.name)
                        }
                    })
                    .collect::<Result<Vec<_>>>()?;

                let mut plan = self.query_to_plan(*query, &mut PlannerContext::new())?;
                plan = self.apply_expr_alias(plan, columns)?;

                Ok(LogicalPlan::Ddl(DdlStatement::CreateView(CreateView {
                    name: self.object_name_to_table_reference(name)?,
                    input: Arc::new(plan),
                    or_replace,
                    definition: Some(sql),
                    temporary,
                })))
            }
            Statement::ShowCreate { obj_type, obj_name } => match obj_type {
                ShowCreateObject::Table => self.show_create_table_to_plan(obj_name),
                _ => {
                    not_impl_err!("Only `SHOW CREATE TABLE  ...` statement is supported")
                }
            },
            Statement::CreateSchema {
                schema_name,
                if_not_exists,
            } => Ok(LogicalPlan::Ddl(DdlStatement::CreateCatalogSchema(
                CreateCatalogSchema {
                    schema_name: get_schema_name(&schema_name),
                    if_not_exists,
                    schema: Arc::new(DFSchema::empty()),
                },
            ))),
            Statement::CreateDatabase {
                db_name,
                if_not_exists,
                ..
            } => Ok(LogicalPlan::Ddl(DdlStatement::CreateCatalog(
                CreateCatalog {
                    catalog_name: object_name_to_string(&db_name),
                    if_not_exists,
                    schema: Arc::new(DFSchema::empty()),
                },
            ))),
            Statement::Drop {
                object_type,
                if_exists,
                mut names,
                cascade,
                restrict: _,
                purge: _,
                temporary: _,
            } => {
                // We don't support cascade and purge for now.
                // nor do we support multiple object names
                let name = match names.len() {
                    0 => Err(ParserError("Missing table name.".to_string()).into()),
                    1 => self.object_name_to_table_reference(names.pop().unwrap()),
                    _ => {
                        Err(ParserError("Multiple objects not supported".to_string())
                            .into())
                    }
                }?;

                match object_type {
                    ObjectType::Table => {
                        Ok(LogicalPlan::Ddl(DdlStatement::DropTable(DropTable {
                            name,
                            if_exists,
                            schema: DFSchemaRef::new(DFSchema::empty()),
                        })))
                    }
                    ObjectType::View => {
                        Ok(LogicalPlan::Ddl(DdlStatement::DropView(DropView {
                            name,
                            if_exists,
                            schema: DFSchemaRef::new(DFSchema::empty()),
                        })))
                    }
                    ObjectType::Schema => {
                        let name = match name {
                            TableReference::Bare { table } => Ok(SchemaReference::Bare { schema: table }),
                            TableReference::Partial { schema, table } => Ok(SchemaReference::Full { schema: table, catalog: schema }),
                            TableReference::Full { catalog: _, schema: _, table: _ } => {
                                Err(ParserError("Invalid schema specifier (has 3 parts)".to_string()))
                            }
                        }?;
                        Ok(LogicalPlan::Ddl(DdlStatement::DropCatalogSchema(DropCatalogSchema {
                            name,
                            if_exists,
                            cascade,
                            schema: DFSchemaRef::new(DFSchema::empty()),
                        })))
                    }
                    _ => not_impl_err!(
                        "Only `DROP TABLE/VIEW/SCHEMA  ...` statement is supported currently"
                    ),
                }
            }
            Statement::Prepare {
                name,
                data_types,
                statement,
            } => {
                // Convert parser data types to DataFusion data types
                let data_types: Vec<DataType> = data_types
                    .into_iter()
                    .map(|t| self.convert_data_type(&t))
                    .collect::<Result<_>>()?;

                // Create planner context with parameters
                let mut planner_context = PlannerContext::new()
                    .with_prepare_param_data_types(data_types.clone());

                // Build logical plan for inner statement of the prepare statement
                let plan = self.sql_statement_to_plan_with_context_impl(
                    *statement,
                    &mut planner_context,
                )?;
                Ok(LogicalPlan::Statement(PlanStatement::Prepare(Prepare {
                    name: ident_to_string(&name),
                    data_types,
                    input: Arc::new(plan),
                })))
            }
            Statement::Execute {
                name,
                parameters,
                using,
                // has_parentheses specifies the syntax, but the plan is the
                // same no matter the syntax used, so ignore it
                has_parentheses: _,
                immediate,
                into,
            } => {
                // `USING` is a MySQL-specific syntax and currently not supported.
                if !using.is_empty() {
                    return not_impl_err!(
                        "Execute statement with USING is not supported"
                    );
                }
                if immediate {
                    return not_impl_err!(
                        "Execute statement with IMMEDIATE is not supported"
                    );
                }
                if !into.is_empty() {
                    return not_impl_err!("Execute statement with INTO is not supported");
                }
                let empty_schema = DFSchema::empty();
                let parameters = parameters
                    .into_iter()
                    .map(|expr| self.sql_to_expr(expr, &empty_schema, planner_context))
                    .collect::<Result<Vec<Expr>>>()?;

                Ok(LogicalPlan::Statement(PlanStatement::Execute(Execute {
                    name: object_name_to_string(&name.unwrap()),
                    parameters,
                })))
            }
            Statement::Deallocate {
                name,
                // Similar to PostgreSQL, the PREPARE keyword is ignored
                prepare: _,
            } => Ok(LogicalPlan::Statement(PlanStatement::Deallocate(
                Deallocate {
                    name: ident_to_string(&name),
                },
            ))),

            Statement::ShowTables {
                extended,
                full,
                terse,
                history,
                external,
                show_options,
            } => {
                // We only support the basic "SHOW TABLES"
                // https://github.com/apache/datafusion/issues/3188
                if extended {
                    return not_impl_err!("SHOW TABLES EXTENDED not supported")?;
                }
                if full {
                    return not_impl_err!("SHOW FULL TABLES not supported")?;
                }
                if terse {
                    return not_impl_err!("SHOW TERSE TABLES not supported")?;
                }
                if history {
                    return not_impl_err!("SHOW TABLES HISTORY not supported")?;
                }
                if external {
                    return not_impl_err!("SHOW EXTERNAL TABLES not supported")?;
                }
                let ShowStatementOptions {
                    show_in,
                    starts_with,
                    limit,
                    limit_from,
                    filter_position,
                } = show_options;
                if show_in.is_some() {
                    return not_impl_err!("SHOW TABLES IN not supported")?;
                }
                if starts_with.is_some() {
                    return not_impl_err!("SHOW TABLES LIKE not supported")?;
                }
                if limit.is_some() {
                    return not_impl_err!("SHOW TABLES LIMIT not supported")?;
                }
                if limit_from.is_some() {
                    return not_impl_err!("SHOW TABLES LIMIT FROM not supported")?;
                }
                if filter_position.is_some() {
                    return not_impl_err!("SHOW TABLES FILTER not supported")?;
                }
                self.show_tables_to_plan()
            }

            Statement::ShowColumns {
                extended,
                full,
                show_options,
            } => {
                let ShowStatementOptions {
                    show_in,
                    starts_with,
                    limit,
                    limit_from,
                    filter_position,
                } = show_options;
                if starts_with.is_some() {
                    return not_impl_err!("SHOW COLUMNS LIKE not supported")?;
                }
                if limit.is_some() {
                    return not_impl_err!("SHOW COLUMNS LIMIT not supported")?;
                }
                if limit_from.is_some() {
                    return not_impl_err!("SHOW COLUMNS LIMIT FROM not supported")?;
                }
                if filter_position.is_some() {
                    return not_impl_err!(
                        "SHOW COLUMNS with WHERE or LIKE is not supported"
                    )?;
                }
                let Some(ShowStatementIn {
                    // specifies if the syntax was `SHOW COLUMNS IN` or `SHOW
                    // COLUMNS FROM` which is not different in DataFusion
                    clause: _,
                    parent_type,
                    parent_name,
                }) = show_in
                else {
                    return plan_err!("SHOW COLUMNS requires a table name");
                };

                if let Some(parent_type) = parent_type {
                    return not_impl_err!("SHOW COLUMNS IN {parent_type} not supported");
                }
                let Some(table_name) = parent_name else {
                    return plan_err!("SHOW COLUMNS requires a table name");
                };

                self.show_columns_to_plan(extended, full, table_name)
            }

            Statement::ShowFunctions { filter, .. } => {
                self.show_functions_to_plan(filter)
            }

            Statement::Insert(Insert {
                or,
                into,
                columns,
                overwrite,
                source,
                partitioned,
                after_columns,
                table,
                on,
                returning,
                ignore,
                table_alias,
                mut replace_into,
                priority,
                insert_alias,
                assignments,
                has_table_keyword,
                settings,
                format_clause,
            }) => {
                let table_name = match table {
                    TableObject::TableName(table_name) => table_name,
                    TableObject::TableFunction(_) => {
                        return not_impl_err!("INSERT INTO Table functions not supported")
                    }
                };
                if let Some(or) = or {
                    match or {
                        SqliteOnConflict::Replace => replace_into = true,
                        _ => plan_err!("Inserts with {or} clause is not supported")?,
                    }
                }
                if partitioned.is_some() {
                    plan_err!("Partitioned inserts not yet supported")?;
                }
                if !after_columns.is_empty() {
                    plan_err!("After-columns clause not supported")?;
                }
                if on.is_some() {
                    plan_err!("Insert-on clause not supported")?;
                }
                if returning.is_some() {
                    plan_err!("Insert-returning clause not supported")?;
                }
                if ignore {
                    plan_err!("Insert-ignore clause not supported")?;
                }
                let Some(source) = source else {
                    plan_err!("Inserts without a source not supported")?
                };
                if let Some(table_alias) = table_alias {
                    plan_err!(
                        "Inserts with a table alias not supported: {table_alias:?}"
                    )?
                };
                if let Some(priority) = priority {
                    plan_err!(
                        "Inserts with a `PRIORITY` clause not supported: {priority:?}"
                    )?
                };
                if insert_alias.is_some() {
                    plan_err!("Inserts with an alias not supported")?;
                }
                if !assignments.is_empty() {
                    plan_err!("Inserts with assignments not supported")?;
                }
                if settings.is_some() {
                    plan_err!("Inserts with settings not supported")?;
                }
                if format_clause.is_some() {
                    plan_err!("Inserts with format clause not supported")?;
                }
                // optional keywords don't change behavior
                let _ = into;
                let _ = has_table_keyword;
                self.insert_to_plan(table_name, columns, source, overwrite, replace_into)
            }
            Statement::Update {
                table,
                assignments,
                from,
                selection,
                returning,
                or,
            } => {
                let froms =
                    from.map(|update_table_from_kind| match update_table_from_kind {
                        UpdateTableFromKind::BeforeSet(froms) => froms,
                        UpdateTableFromKind::AfterSet(froms) => froms,
                    });
                // TODO: support multiple tables in UPDATE SET FROM
                if froms.as_ref().is_some_and(|f| f.len() > 1) {
                    plan_err!("Multiple tables in UPDATE SET FROM not yet supported")?;
                }
                let update_from = froms.and_then(|mut f| f.pop());
                if returning.is_some() {
                    plan_err!("Update-returning clause not yet supported")?;
                }
                if or.is_some() {
                    plan_err!("ON conflict not supported")?;
                }
                self.update_to_plan(table, assignments, update_from, selection)
            }

            Statement::Delete(Delete {
                tables,
                using,
                selection,
                returning,
                from,
                order_by,
                limit,
            }) => {
                if !tables.is_empty() {
                    plan_err!("DELETE <TABLE> not supported")?;
                }

                if using.is_some() {
                    plan_err!("Using clause not supported")?;
                }

                if returning.is_some() {
                    plan_err!("Delete-returning clause not yet supported")?;
                }

                if !order_by.is_empty() {
                    plan_err!("Delete-order-by clause not yet supported")?;
                }

                if limit.is_some() {
                    plan_err!("Delete-limit clause not yet supported")?;
                }

                let table_name = self.get_delete_target(from)?;
                self.delete_to_plan(table_name, selection)
            }

            Statement::StartTransaction {
                modes,
                begin: false,
                modifier,
                transaction,
                statements,
                exception_statements,
                has_end_keyword,
            } => {
                if let Some(modifier) = modifier {
                    return not_impl_err!(
                        "Transaction modifier not supported: {modifier}"
                    );
                }
                if !statements.is_empty() {
                    return not_impl_err!(
                        "Transaction with multiple statements not supported"
                    );
                }
                if exception_statements.is_some() {
                    return not_impl_err!(
                        "Transaction with exception statements not supported"
                    );
                }
                if has_end_keyword {
                    return not_impl_err!("Transaction with END keyword not supported");
                }
                self.validate_transaction_kind(transaction)?;
                let isolation_level: ast::TransactionIsolationLevel = modes
                    .iter()
                    .filter_map(|m: &TransactionMode| match m {
                        TransactionMode::AccessMode(_) => None,
                        TransactionMode::IsolationLevel(level) => Some(level),
                    })
                    .next_back()
                    .copied()
                    .unwrap_or(ast::TransactionIsolationLevel::Serializable);
                let access_mode: ast::TransactionAccessMode = modes
                    .iter()
                    .filter_map(|m: &TransactionMode| match m {
                        TransactionMode::AccessMode(mode) => Some(mode),
                        TransactionMode::IsolationLevel(_) => None,
                    })
                    .next_back()
                    .copied()
                    .unwrap_or(ast::TransactionAccessMode::ReadWrite);
                let isolation_level = match isolation_level {
                    ast::TransactionIsolationLevel::ReadUncommitted => {
                        TransactionIsolationLevel::ReadUncommitted
                    }
                    ast::TransactionIsolationLevel::ReadCommitted => {
                        TransactionIsolationLevel::ReadCommitted
                    }
                    ast::TransactionIsolationLevel::RepeatableRead => {
                        TransactionIsolationLevel::RepeatableRead
                    }
                    ast::TransactionIsolationLevel::Serializable => {
                        TransactionIsolationLevel::Serializable
                    }
                    ast::TransactionIsolationLevel::Snapshot => {
                        TransactionIsolationLevel::Snapshot
                    }
                };
                let access_mode = match access_mode {
                    ast::TransactionAccessMode::ReadOnly => {
                        TransactionAccessMode::ReadOnly
                    }
                    ast::TransactionAccessMode::ReadWrite => {
                        TransactionAccessMode::ReadWrite
                    }
                };
                let statement = PlanStatement::TransactionStart(TransactionStart {
                    access_mode,
                    isolation_level,
                });
                Ok(LogicalPlan::Statement(statement))
            }
            Statement::Commit {
                chain,
                end,
                modifier,
            } => {
                if end {
                    return not_impl_err!("COMMIT AND END not supported");
                };
                if let Some(modifier) = modifier {
                    return not_impl_err!("COMMIT {modifier} not supported");
                };
                let statement = PlanStatement::TransactionEnd(TransactionEnd {
                    conclusion: TransactionConclusion::Commit,
                    chain,
                });
                Ok(LogicalPlan::Statement(statement))
            }
            Statement::Rollback { chain, savepoint } => {
                if savepoint.is_some() {
                    plan_err!("Savepoints not supported")?;
                }
                let statement = PlanStatement::TransactionEnd(TransactionEnd {
                    conclusion: TransactionConclusion::Rollback,
                    chain,
                });
                Ok(LogicalPlan::Statement(statement))
            }
            Statement::CreateFunction(ast::CreateFunction {
                or_replace,
                temporary,
                name,
                args,
                return_type,
                function_body,
                behavior,
                language,
                ..
            }) => {
                let return_type = match return_type {
                    Some(t) => Some(self.convert_data_type(&t)?),
                    None => None,
                };
                let mut planner_context = PlannerContext::new();
                let empty_schema = &DFSchema::empty();

                let args = match args {
                    Some(function_args) => {
                        let function_args = function_args
                            .into_iter()
                            .map(|arg| {
                                let data_type = self.convert_data_type(&arg.data_type)?;

                                let default_expr = match arg.default_expr {
                                    Some(expr) => Some(self.sql_to_expr(
                                        expr,
                                        empty_schema,
                                        &mut planner_context,
                                    )?),
                                    None => None,
                                };
                                Ok(OperateFunctionArg {
                                    name: arg.name,
                                    default_expr,
                                    data_type,
                                })
                            })
                            .collect::<Result<Vec<OperateFunctionArg>>>();
                        Some(function_args?)
                    }
                    None => None,
                };
                // At the moment functions can't be qualified `schema.name`
                let name = match &name.0[..] {
                    [] => exec_err!("Function should have name")?,
                    [n] => n.as_ident().unwrap().value.clone(),
                    [..] => not_impl_err!("Qualified functions are not supported")?,
                };
                //
                // Convert resulting expression to data fusion expression
                //
                let arg_types = args.as_ref().map(|arg| {
                    arg.iter().map(|t| t.data_type.clone()).collect::<Vec<_>>()
                });
                let mut planner_context = PlannerContext::new()
                    .with_prepare_param_data_types(arg_types.unwrap_or_default());

                let function_body = match function_body {
                    Some(r) => Some(self.sql_to_expr(
                        match r {
                            ast::CreateFunctionBody::AsBeforeOptions(expr) => expr,
                            ast::CreateFunctionBody::AsAfterOptions(expr) => expr,
                            ast::CreateFunctionBody::Return(expr) => expr,
                        },
                        &DFSchema::empty(),
                        &mut planner_context,
                    )?),
                    None => None,
                };

                let params = CreateFunctionBody {
                    language,
                    behavior: behavior.map(|b| match b {
                        ast::FunctionBehavior::Immutable => Volatility::Immutable,
                        ast::FunctionBehavior::Stable => Volatility::Stable,
                        ast::FunctionBehavior::Volatile => Volatility::Volatile,
                    }),
                    function_body,
                };

                let statement = DdlStatement::CreateFunction(CreateFunction {
                    or_replace,
                    temporary,
                    name,
                    return_type,
                    args,
                    params,
                    schema: DFSchemaRef::new(DFSchema::empty()),
                });

                Ok(LogicalPlan::Ddl(statement))
            }
            Statement::DropFunction {
                if_exists,
                func_desc,
                ..
            } => {
                // According to postgresql documentation it can be only one function
                // specified in drop statement
                if let Some(desc) = func_desc.first() {
                    // At the moment functions can't be qualified `schema.name`
                    let name = match &desc.name.0[..] {
                        [] => exec_err!("Function should have name")?,
                        [n] => n.as_ident().unwrap().value.clone(),
                        [..] => not_impl_err!("Qualified functions are not supported")?,
                    };
                    let statement = DdlStatement::DropFunction(DropFunction {
                        if_exists,
                        name,
                        schema: DFSchemaRef::new(DFSchema::empty()),
                    });
                    Ok(LogicalPlan::Ddl(statement))
                } else {
                    exec_err!("Function name not provided")
                }
            }
            Statement::CreateIndex(CreateIndex {
                name,
                table_name,
                using,
                columns,
                unique,
                if_not_exists,
                ..
            }) => {
                let name: Option<String> = name.as_ref().map(object_name_to_string);
                let table = self.object_name_to_table_reference(table_name)?;
                let table_schema = self
                    .context_provider
                    .get_table_source(table.clone())?
                    .schema()
                    .to_dfschema_ref()?;
                let using: Option<String> = using.as_ref().map(ident_to_string);
                let columns = self.order_by_to_sort_expr(
                    columns,
                    &table_schema,
                    planner_context,
                    false,
                    None,
                )?;
                Ok(LogicalPlan::Ddl(DdlStatement::CreateIndex(
                    PlanCreateIndex {
                        name,
                        table,
                        using,
                        columns,
                        unique,
                        if_not_exists,
                        schema: DFSchemaRef::new(DFSchema::empty()),
                    },
                )))
            }
            stmt => {
                not_impl_err!("Unsupported SQL statement: {stmt}")
            }
        }
    }