in datafusion/sql/src/statement.rs [211:1265]
fn sql_statement_to_plan_with_context_impl(
&self,
statement: Statement,
planner_context: &mut PlannerContext,
) -> Result<LogicalPlan> {
match statement {
Statement::ExplainTable {
describe_alias: DescribeAlias::Describe, // only parse 'DESCRIBE table_name' and not 'EXPLAIN table_name'
table_name,
..
} => self.describe_table_to_plan(table_name),
Statement::Explain {
verbose,
statement,
analyze,
format,
describe_alias: _,
..
} => {
let format = format.map(|format| format.to_string());
let statement = DFStatement::Statement(statement);
self.explain_to_plan(verbose, analyze, format, statement)
}
Statement::Query(query) => self.query_to_plan(*query, planner_context),
Statement::ShowVariable { variable } => self.show_variable_to_plan(&variable),
Statement::SetVariable {
local,
hivevar,
variables,
value,
} => self.set_variable_to_plan(local, hivevar, &variables, value),
Statement::CreateTable(CreateTable {
temporary,
external,
global,
transient,
volatile,
hive_distribution,
hive_formats,
file_format,
location,
query,
name,
columns,
constraints,
table_properties,
with_options,
if_not_exists,
or_replace,
without_rowid,
like,
clone,
engine,
comment,
auto_increment_offset,
default_charset,
collation,
on_commit,
on_cluster,
primary_key,
order_by,
partition_by,
cluster_by,
clustered_by,
options,
strict,
copy_grants,
enable_schema_evolution,
change_tracking,
data_retention_time_in_days,
max_data_extension_time_in_days,
default_ddl_collation,
with_aggregation_policy,
with_row_access_policy,
with_tags,
iceberg,
external_volume,
base_location,
catalog,
catalog_sync,
storage_serialization_policy,
}) if table_properties.is_empty() && with_options.is_empty() => {
if temporary {
return not_impl_err!("Temporary tables not supported")?;
}
if external {
return not_impl_err!("External tables not supported")?;
}
if global.is_some() {
return not_impl_err!("Global tables not supported")?;
}
if transient {
return not_impl_err!("Transient tables not supported")?;
}
if volatile {
return not_impl_err!("Volatile tables not supported")?;
}
if hive_distribution != ast::HiveDistributionStyle::NONE {
return not_impl_err!(
"Hive distribution not supported: {hive_distribution:?}"
)?;
}
if !matches!(
hive_formats,
Some(ast::HiveFormat {
row_format: None,
serde_properties: None,
storage: None,
location: None,
})
) {
return not_impl_err!(
"Hive formats not supported: {hive_formats:?}"
)?;
}
if file_format.is_some() {
return not_impl_err!("File format not supported")?;
}
if location.is_some() {
return not_impl_err!("Location not supported")?;
}
if without_rowid {
return not_impl_err!("Without rowid not supported")?;
}
if like.is_some() {
return not_impl_err!("Like not supported")?;
}
if clone.is_some() {
return not_impl_err!("Clone not supported")?;
}
if engine.is_some() {
return not_impl_err!("Engine not supported")?;
}
if comment.is_some() {
return not_impl_err!("Comment not supported")?;
}
if auto_increment_offset.is_some() {
return not_impl_err!("Auto increment offset not supported")?;
}
if default_charset.is_some() {
return not_impl_err!("Default charset not supported")?;
}
if collation.is_some() {
return not_impl_err!("Collation not supported")?;
}
if on_commit.is_some() {
return not_impl_err!("On commit not supported")?;
}
if on_cluster.is_some() {
return not_impl_err!("On cluster not supported")?;
}
if primary_key.is_some() {
return not_impl_err!("Primary key not supported")?;
}
if order_by.is_some() {
return not_impl_err!("Order by not supported")?;
}
if partition_by.is_some() {
return not_impl_err!("Partition by not supported")?;
}
if cluster_by.is_some() {
return not_impl_err!("Cluster by not supported")?;
}
if clustered_by.is_some() {
return not_impl_err!("Clustered by not supported")?;
}
if options.is_some() {
return not_impl_err!("Options not supported")?;
}
if strict {
return not_impl_err!("Strict not supported")?;
}
if copy_grants {
return not_impl_err!("Copy grants not supported")?;
}
if enable_schema_evolution.is_some() {
return not_impl_err!("Enable schema evolution not supported")?;
}
if change_tracking.is_some() {
return not_impl_err!("Change tracking not supported")?;
}
if data_retention_time_in_days.is_some() {
return not_impl_err!("Data retention time in days not supported")?;
}
if max_data_extension_time_in_days.is_some() {
return not_impl_err!(
"Max data extension time in days not supported"
)?;
}
if default_ddl_collation.is_some() {
return not_impl_err!("Default DDL collation not supported")?;
}
if with_aggregation_policy.is_some() {
return not_impl_err!("With aggregation policy not supported")?;
}
if with_row_access_policy.is_some() {
return not_impl_err!("With row access policy not supported")?;
}
if with_tags.is_some() {
return not_impl_err!("With tags not supported")?;
}
if iceberg {
return not_impl_err!("Iceberg not supported")?;
}
if external_volume.is_some() {
return not_impl_err!("External volume not supported")?;
}
if base_location.is_some() {
return not_impl_err!("Base location not supported")?;
}
if catalog.is_some() {
return not_impl_err!("Catalog not supported")?;
}
if catalog_sync.is_some() {
return not_impl_err!("Catalog sync not supported")?;
}
if storage_serialization_policy.is_some() {
return not_impl_err!("Storage serialization policy not supported")?;
}
// Merge inline constraints and existing constraints
let mut all_constraints = constraints;
let inline_constraints = calc_inline_constraints_from_columns(&columns);
all_constraints.extend(inline_constraints);
// Build column default values
let column_defaults =
self.build_column_defaults(&columns, planner_context)?;
let has_columns = !columns.is_empty();
let schema = self.build_schema(columns)?.to_dfschema_ref()?;
if has_columns {
planner_context.set_table_schema(Some(Arc::clone(&schema)));
}
match query {
Some(query) => {
let plan = self.query_to_plan(*query, planner_context)?;
let input_schema = plan.schema();
let plan = if has_columns {
if schema.fields().len() != input_schema.fields().len() {
return plan_err!(
"Mismatch: {} columns specified, but result has {} columns",
schema.fields().len(),
input_schema.fields().len()
);
}
let input_fields = input_schema.fields();
let project_exprs = schema
.fields()
.iter()
.zip(input_fields)
.map(|(field, input_field)| {
cast(
col(input_field.name()),
field.data_type().clone(),
)
.alias(field.name())
})
.collect::<Vec<_>>();
LogicalPlanBuilder::from(plan.clone())
.project(project_exprs)?
.build()?
} else {
plan
};
let constraints = self.new_constraint_from_table_constraints(
&all_constraints,
plan.schema(),
)?;
Ok(LogicalPlan::Ddl(DdlStatement::CreateMemoryTable(
CreateMemoryTable {
name: self.object_name_to_table_reference(name)?,
constraints,
input: Arc::new(plan),
if_not_exists,
or_replace,
column_defaults,
temporary,
},
)))
}
None => {
let plan = EmptyRelation {
produce_one_row: false,
schema,
};
let plan = LogicalPlan::EmptyRelation(plan);
let constraints = self.new_constraint_from_table_constraints(
&all_constraints,
plan.schema(),
)?;
Ok(LogicalPlan::Ddl(DdlStatement::CreateMemoryTable(
CreateMemoryTable {
name: self.object_name_to_table_reference(name)?,
constraints,
input: Arc::new(plan),
if_not_exists,
or_replace,
column_defaults,
temporary,
},
)))
}
}
}
Statement::CreateView {
or_replace,
materialized,
name,
columns,
query,
options: CreateTableOptions::None,
cluster_by,
comment,
with_no_schema_binding,
if_not_exists,
temporary,
to,
params,
} => {
if materialized {
return not_impl_err!("Materialized views not supported")?;
}
if !cluster_by.is_empty() {
return not_impl_err!("Cluster by not supported")?;
}
if comment.is_some() {
return not_impl_err!("Comment not supported")?;
}
if with_no_schema_binding {
return not_impl_err!("With no schema binding not supported")?;
}
if if_not_exists {
return not_impl_err!("If not exists not supported")?;
}
if to.is_some() {
return not_impl_err!("To not supported")?;
}
// put the statement back together temporarily to get the SQL
// string representation
let stmt = Statement::CreateView {
or_replace,
materialized,
name,
columns,
query,
options: CreateTableOptions::None,
cluster_by,
comment,
with_no_schema_binding,
if_not_exists,
temporary,
to,
params,
};
let sql = stmt.to_string();
let Statement::CreateView {
name,
columns,
query,
or_replace,
temporary,
..
} = stmt
else {
return internal_err!("Unreachable code in create view");
};
let columns = columns
.into_iter()
.map(|view_column_def| {
if let Some(options) = view_column_def.options {
plan_err!(
"Options not supported for view columns: {options:?}"
)
} else {
Ok(view_column_def.name)
}
})
.collect::<Result<Vec<_>>>()?;
let mut plan = self.query_to_plan(*query, &mut PlannerContext::new())?;
plan = self.apply_expr_alias(plan, columns)?;
Ok(LogicalPlan::Ddl(DdlStatement::CreateView(CreateView {
name: self.object_name_to_table_reference(name)?,
input: Arc::new(plan),
or_replace,
definition: Some(sql),
temporary,
})))
}
Statement::ShowCreate { obj_type, obj_name } => match obj_type {
ShowCreateObject::Table => self.show_create_table_to_plan(obj_name),
_ => {
not_impl_err!("Only `SHOW CREATE TABLE ...` statement is supported")
}
},
Statement::CreateSchema {
schema_name,
if_not_exists,
} => Ok(LogicalPlan::Ddl(DdlStatement::CreateCatalogSchema(
CreateCatalogSchema {
schema_name: get_schema_name(&schema_name),
if_not_exists,
schema: Arc::new(DFSchema::empty()),
},
))),
Statement::CreateDatabase {
db_name,
if_not_exists,
..
} => Ok(LogicalPlan::Ddl(DdlStatement::CreateCatalog(
CreateCatalog {
catalog_name: object_name_to_string(&db_name),
if_not_exists,
schema: Arc::new(DFSchema::empty()),
},
))),
Statement::Drop {
object_type,
if_exists,
mut names,
cascade,
restrict: _,
purge: _,
temporary: _,
} => {
// We don't support cascade and purge for now.
// nor do we support multiple object names
let name = match names.len() {
0 => Err(ParserError("Missing table name.".to_string()).into()),
1 => self.object_name_to_table_reference(names.pop().unwrap()),
_ => {
Err(ParserError("Multiple objects not supported".to_string())
.into())
}
}?;
match object_type {
ObjectType::Table => {
Ok(LogicalPlan::Ddl(DdlStatement::DropTable(DropTable {
name,
if_exists,
schema: DFSchemaRef::new(DFSchema::empty()),
})))
}
ObjectType::View => {
Ok(LogicalPlan::Ddl(DdlStatement::DropView(DropView {
name,
if_exists,
schema: DFSchemaRef::new(DFSchema::empty()),
})))
}
ObjectType::Schema => {
let name = match name {
TableReference::Bare { table } => Ok(SchemaReference::Bare { schema: table }),
TableReference::Partial { schema, table } => Ok(SchemaReference::Full { schema: table, catalog: schema }),
TableReference::Full { catalog: _, schema: _, table: _ } => {
Err(ParserError("Invalid schema specifier (has 3 parts)".to_string()))
}
}?;
Ok(LogicalPlan::Ddl(DdlStatement::DropCatalogSchema(DropCatalogSchema {
name,
if_exists,
cascade,
schema: DFSchemaRef::new(DFSchema::empty()),
})))
}
_ => not_impl_err!(
"Only `DROP TABLE/VIEW/SCHEMA ...` statement is supported currently"
),
}
}
Statement::Prepare {
name,
data_types,
statement,
} => {
// Convert parser data types to DataFusion data types
let data_types: Vec<DataType> = data_types
.into_iter()
.map(|t| self.convert_data_type(&t))
.collect::<Result<_>>()?;
// Create planner context with parameters
let mut planner_context = PlannerContext::new()
.with_prepare_param_data_types(data_types.clone());
// Build logical plan for inner statement of the prepare statement
let plan = self.sql_statement_to_plan_with_context_impl(
*statement,
&mut planner_context,
)?;
Ok(LogicalPlan::Statement(PlanStatement::Prepare(Prepare {
name: ident_to_string(&name),
data_types,
input: Arc::new(plan),
})))
}
Statement::Execute {
name,
parameters,
using,
// has_parentheses specifies the syntax, but the plan is the
// same no matter the syntax used, so ignore it
has_parentheses: _,
immediate,
into,
} => {
// `USING` is a MySQL-specific syntax and currently not supported.
if !using.is_empty() {
return not_impl_err!(
"Execute statement with USING is not supported"
);
}
if immediate {
return not_impl_err!(
"Execute statement with IMMEDIATE is not supported"
);
}
if !into.is_empty() {
return not_impl_err!("Execute statement with INTO is not supported");
}
let empty_schema = DFSchema::empty();
let parameters = parameters
.into_iter()
.map(|expr| self.sql_to_expr(expr, &empty_schema, planner_context))
.collect::<Result<Vec<Expr>>>()?;
Ok(LogicalPlan::Statement(PlanStatement::Execute(Execute {
name: object_name_to_string(&name.unwrap()),
parameters,
})))
}
Statement::Deallocate {
name,
// Similar to PostgreSQL, the PREPARE keyword is ignored
prepare: _,
} => Ok(LogicalPlan::Statement(PlanStatement::Deallocate(
Deallocate {
name: ident_to_string(&name),
},
))),
Statement::ShowTables {
extended,
full,
terse,
history,
external,
show_options,
} => {
// We only support the basic "SHOW TABLES"
// https://github.com/apache/datafusion/issues/3188
if extended {
return not_impl_err!("SHOW TABLES EXTENDED not supported")?;
}
if full {
return not_impl_err!("SHOW FULL TABLES not supported")?;
}
if terse {
return not_impl_err!("SHOW TERSE TABLES not supported")?;
}
if history {
return not_impl_err!("SHOW TABLES HISTORY not supported")?;
}
if external {
return not_impl_err!("SHOW EXTERNAL TABLES not supported")?;
}
let ShowStatementOptions {
show_in,
starts_with,
limit,
limit_from,
filter_position,
} = show_options;
if show_in.is_some() {
return not_impl_err!("SHOW TABLES IN not supported")?;
}
if starts_with.is_some() {
return not_impl_err!("SHOW TABLES LIKE not supported")?;
}
if limit.is_some() {
return not_impl_err!("SHOW TABLES LIMIT not supported")?;
}
if limit_from.is_some() {
return not_impl_err!("SHOW TABLES LIMIT FROM not supported")?;
}
if filter_position.is_some() {
return not_impl_err!("SHOW TABLES FILTER not supported")?;
}
self.show_tables_to_plan()
}
Statement::ShowColumns {
extended,
full,
show_options,
} => {
let ShowStatementOptions {
show_in,
starts_with,
limit,
limit_from,
filter_position,
} = show_options;
if starts_with.is_some() {
return not_impl_err!("SHOW COLUMNS LIKE not supported")?;
}
if limit.is_some() {
return not_impl_err!("SHOW COLUMNS LIMIT not supported")?;
}
if limit_from.is_some() {
return not_impl_err!("SHOW COLUMNS LIMIT FROM not supported")?;
}
if filter_position.is_some() {
return not_impl_err!(
"SHOW COLUMNS with WHERE or LIKE is not supported"
)?;
}
let Some(ShowStatementIn {
// specifies if the syntax was `SHOW COLUMNS IN` or `SHOW
// COLUMNS FROM` which is not different in DataFusion
clause: _,
parent_type,
parent_name,
}) = show_in
else {
return plan_err!("SHOW COLUMNS requires a table name");
};
if let Some(parent_type) = parent_type {
return not_impl_err!("SHOW COLUMNS IN {parent_type} not supported");
}
let Some(table_name) = parent_name else {
return plan_err!("SHOW COLUMNS requires a table name");
};
self.show_columns_to_plan(extended, full, table_name)
}
Statement::ShowFunctions { filter, .. } => {
self.show_functions_to_plan(filter)
}
Statement::Insert(Insert {
or,
into,
columns,
overwrite,
source,
partitioned,
after_columns,
table,
on,
returning,
ignore,
table_alias,
mut replace_into,
priority,
insert_alias,
assignments,
has_table_keyword,
settings,
format_clause,
}) => {
let table_name = match table {
TableObject::TableName(table_name) => table_name,
TableObject::TableFunction(_) => {
return not_impl_err!("INSERT INTO Table functions not supported")
}
};
if let Some(or) = or {
match or {
SqliteOnConflict::Replace => replace_into = true,
_ => plan_err!("Inserts with {or} clause is not supported")?,
}
}
if partitioned.is_some() {
plan_err!("Partitioned inserts not yet supported")?;
}
if !after_columns.is_empty() {
plan_err!("After-columns clause not supported")?;
}
if on.is_some() {
plan_err!("Insert-on clause not supported")?;
}
if returning.is_some() {
plan_err!("Insert-returning clause not supported")?;
}
if ignore {
plan_err!("Insert-ignore clause not supported")?;
}
let Some(source) = source else {
plan_err!("Inserts without a source not supported")?
};
if let Some(table_alias) = table_alias {
plan_err!(
"Inserts with a table alias not supported: {table_alias:?}"
)?
};
if let Some(priority) = priority {
plan_err!(
"Inserts with a `PRIORITY` clause not supported: {priority:?}"
)?
};
if insert_alias.is_some() {
plan_err!("Inserts with an alias not supported")?;
}
if !assignments.is_empty() {
plan_err!("Inserts with assignments not supported")?;
}
if settings.is_some() {
plan_err!("Inserts with settings not supported")?;
}
if format_clause.is_some() {
plan_err!("Inserts with format clause not supported")?;
}
// optional keywords don't change behavior
let _ = into;
let _ = has_table_keyword;
self.insert_to_plan(table_name, columns, source, overwrite, replace_into)
}
Statement::Update {
table,
assignments,
from,
selection,
returning,
or,
} => {
let froms =
from.map(|update_table_from_kind| match update_table_from_kind {
UpdateTableFromKind::BeforeSet(froms) => froms,
UpdateTableFromKind::AfterSet(froms) => froms,
});
// TODO: support multiple tables in UPDATE SET FROM
if froms.as_ref().is_some_and(|f| f.len() > 1) {
plan_err!("Multiple tables in UPDATE SET FROM not yet supported")?;
}
let update_from = froms.and_then(|mut f| f.pop());
if returning.is_some() {
plan_err!("Update-returning clause not yet supported")?;
}
if or.is_some() {
plan_err!("ON conflict not supported")?;
}
self.update_to_plan(table, assignments, update_from, selection)
}
Statement::Delete(Delete {
tables,
using,
selection,
returning,
from,
order_by,
limit,
}) => {
if !tables.is_empty() {
plan_err!("DELETE <TABLE> not supported")?;
}
if using.is_some() {
plan_err!("Using clause not supported")?;
}
if returning.is_some() {
plan_err!("Delete-returning clause not yet supported")?;
}
if !order_by.is_empty() {
plan_err!("Delete-order-by clause not yet supported")?;
}
if limit.is_some() {
plan_err!("Delete-limit clause not yet supported")?;
}
let table_name = self.get_delete_target(from)?;
self.delete_to_plan(table_name, selection)
}
Statement::StartTransaction {
modes,
begin: false,
modifier,
transaction,
statements,
exception_statements,
has_end_keyword,
} => {
if let Some(modifier) = modifier {
return not_impl_err!(
"Transaction modifier not supported: {modifier}"
);
}
if !statements.is_empty() {
return not_impl_err!(
"Transaction with multiple statements not supported"
);
}
if exception_statements.is_some() {
return not_impl_err!(
"Transaction with exception statements not supported"
);
}
if has_end_keyword {
return not_impl_err!("Transaction with END keyword not supported");
}
self.validate_transaction_kind(transaction)?;
let isolation_level: ast::TransactionIsolationLevel = modes
.iter()
.filter_map(|m: &TransactionMode| match m {
TransactionMode::AccessMode(_) => None,
TransactionMode::IsolationLevel(level) => Some(level),
})
.next_back()
.copied()
.unwrap_or(ast::TransactionIsolationLevel::Serializable);
let access_mode: ast::TransactionAccessMode = modes
.iter()
.filter_map(|m: &TransactionMode| match m {
TransactionMode::AccessMode(mode) => Some(mode),
TransactionMode::IsolationLevel(_) => None,
})
.next_back()
.copied()
.unwrap_or(ast::TransactionAccessMode::ReadWrite);
let isolation_level = match isolation_level {
ast::TransactionIsolationLevel::ReadUncommitted => {
TransactionIsolationLevel::ReadUncommitted
}
ast::TransactionIsolationLevel::ReadCommitted => {
TransactionIsolationLevel::ReadCommitted
}
ast::TransactionIsolationLevel::RepeatableRead => {
TransactionIsolationLevel::RepeatableRead
}
ast::TransactionIsolationLevel::Serializable => {
TransactionIsolationLevel::Serializable
}
ast::TransactionIsolationLevel::Snapshot => {
TransactionIsolationLevel::Snapshot
}
};
let access_mode = match access_mode {
ast::TransactionAccessMode::ReadOnly => {
TransactionAccessMode::ReadOnly
}
ast::TransactionAccessMode::ReadWrite => {
TransactionAccessMode::ReadWrite
}
};
let statement = PlanStatement::TransactionStart(TransactionStart {
access_mode,
isolation_level,
});
Ok(LogicalPlan::Statement(statement))
}
Statement::Commit {
chain,
end,
modifier,
} => {
if end {
return not_impl_err!("COMMIT AND END not supported");
};
if let Some(modifier) = modifier {
return not_impl_err!("COMMIT {modifier} not supported");
};
let statement = PlanStatement::TransactionEnd(TransactionEnd {
conclusion: TransactionConclusion::Commit,
chain,
});
Ok(LogicalPlan::Statement(statement))
}
Statement::Rollback { chain, savepoint } => {
if savepoint.is_some() {
plan_err!("Savepoints not supported")?;
}
let statement = PlanStatement::TransactionEnd(TransactionEnd {
conclusion: TransactionConclusion::Rollback,
chain,
});
Ok(LogicalPlan::Statement(statement))
}
Statement::CreateFunction(ast::CreateFunction {
or_replace,
temporary,
name,
args,
return_type,
function_body,
behavior,
language,
..
}) => {
let return_type = match return_type {
Some(t) => Some(self.convert_data_type(&t)?),
None => None,
};
let mut planner_context = PlannerContext::new();
let empty_schema = &DFSchema::empty();
let args = match args {
Some(function_args) => {
let function_args = function_args
.into_iter()
.map(|arg| {
let data_type = self.convert_data_type(&arg.data_type)?;
let default_expr = match arg.default_expr {
Some(expr) => Some(self.sql_to_expr(
expr,
empty_schema,
&mut planner_context,
)?),
None => None,
};
Ok(OperateFunctionArg {
name: arg.name,
default_expr,
data_type,
})
})
.collect::<Result<Vec<OperateFunctionArg>>>();
Some(function_args?)
}
None => None,
};
// At the moment functions can't be qualified `schema.name`
let name = match &name.0[..] {
[] => exec_err!("Function should have name")?,
[n] => n.as_ident().unwrap().value.clone(),
[..] => not_impl_err!("Qualified functions are not supported")?,
};
//
// Convert resulting expression to data fusion expression
//
let arg_types = args.as_ref().map(|arg| {
arg.iter().map(|t| t.data_type.clone()).collect::<Vec<_>>()
});
let mut planner_context = PlannerContext::new()
.with_prepare_param_data_types(arg_types.unwrap_or_default());
let function_body = match function_body {
Some(r) => Some(self.sql_to_expr(
match r {
ast::CreateFunctionBody::AsBeforeOptions(expr) => expr,
ast::CreateFunctionBody::AsAfterOptions(expr) => expr,
ast::CreateFunctionBody::Return(expr) => expr,
},
&DFSchema::empty(),
&mut planner_context,
)?),
None => None,
};
let params = CreateFunctionBody {
language,
behavior: behavior.map(|b| match b {
ast::FunctionBehavior::Immutable => Volatility::Immutable,
ast::FunctionBehavior::Stable => Volatility::Stable,
ast::FunctionBehavior::Volatile => Volatility::Volatile,
}),
function_body,
};
let statement = DdlStatement::CreateFunction(CreateFunction {
or_replace,
temporary,
name,
return_type,
args,
params,
schema: DFSchemaRef::new(DFSchema::empty()),
});
Ok(LogicalPlan::Ddl(statement))
}
Statement::DropFunction {
if_exists,
func_desc,
..
} => {
// According to postgresql documentation it can be only one function
// specified in drop statement
if let Some(desc) = func_desc.first() {
// At the moment functions can't be qualified `schema.name`
let name = match &desc.name.0[..] {
[] => exec_err!("Function should have name")?,
[n] => n.as_ident().unwrap().value.clone(),
[..] => not_impl_err!("Qualified functions are not supported")?,
};
let statement = DdlStatement::DropFunction(DropFunction {
if_exists,
name,
schema: DFSchemaRef::new(DFSchema::empty()),
});
Ok(LogicalPlan::Ddl(statement))
} else {
exec_err!("Function name not provided")
}
}
Statement::CreateIndex(CreateIndex {
name,
table_name,
using,
columns,
unique,
if_not_exists,
..
}) => {
let name: Option<String> = name.as_ref().map(object_name_to_string);
let table = self.object_name_to_table_reference(table_name)?;
let table_schema = self
.context_provider
.get_table_source(table.clone())?
.schema()
.to_dfschema_ref()?;
let using: Option<String> = using.as_ref().map(ident_to_string);
let columns = self.order_by_to_sort_expr(
columns,
&table_schema,
planner_context,
false,
None,
)?;
Ok(LogicalPlan::Ddl(DdlStatement::CreateIndex(
PlanCreateIndex {
name,
table,
using,
columns,
unique,
if_not_exists,
schema: DFSchemaRef::new(DFSchema::empty()),
},
)))
}
stmt => {
not_impl_err!("Unsupported SQL statement: {stmt}")
}
}
}