in datafusion/sql/src/statement.rs [102:485]
fn sql_statement_to_plan_with_context(
&self,
statement: Statement,
planner_context: &mut PlannerContext,
) -> Result<LogicalPlan> {
let sql = Some(statement.to_string());
match statement {
Statement::Explain {
verbose,
statement,
analyze,
format: _,
describe_alias: _,
..
} => self.explain_statement_to_plan(verbose, analyze, *statement),
Statement::Query(query) => self.query_to_plan(*query, planner_context),
Statement::ShowVariable { variable } => self.show_variable_to_plan(&variable),
Statement::SetVariable {
local,
hivevar,
variable,
value,
} => self.set_variable_to_plan(local, hivevar, &variable, value),
Statement::CreateTable {
query,
name,
columns,
constraints,
table_properties,
with_options,
if_not_exists,
or_replace,
..
} if table_properties.is_empty() && with_options.is_empty() => match query {
Some(query) => {
let plan = self.query_to_plan(*query, planner_context)?;
let input_schema = plan.schema();
let plan = if !columns.is_empty() {
let schema = self.build_schema(columns)?.to_dfschema_ref()?;
if schema.fields().len() != input_schema.fields().len() {
return plan_err!(
"Mismatch: {} columns specified, but result has {} columns",
schema.fields().len(),
input_schema.fields().len()
);
}
let input_fields = input_schema.fields();
let project_exprs = schema
.fields()
.iter()
.zip(input_fields)
.map(|(field, input_field)| {
cast(col(input_field.name()), field.data_type().clone())
.alias(field.name())
})
.collect::<Vec<_>>();
LogicalPlanBuilder::from(plan.clone())
.project(project_exprs)?
.build()?
} else {
plan
};
let constraints = Constraints::new_from_table_constraints(
&constraints,
plan.schema(),
)?;
Ok(LogicalPlan::Ddl(DdlStatement::CreateMemoryTable(
CreateMemoryTable {
name: self.object_name_to_table_reference(name)?,
constraints,
input: Arc::new(plan),
if_not_exists,
or_replace,
},
)))
}
None => {
let schema = self.build_schema(columns)?.to_dfschema_ref()?;
let plan = EmptyRelation {
produce_one_row: false,
schema,
};
let plan = LogicalPlan::EmptyRelation(plan);
let constraints = Constraints::new_from_table_constraints(
&constraints,
plan.schema(),
)?;
Ok(LogicalPlan::Ddl(DdlStatement::CreateMemoryTable(
CreateMemoryTable {
name: self.object_name_to_table_reference(name)?,
constraints,
input: Arc::new(plan),
if_not_exists,
or_replace,
},
)))
}
},
Statement::CreateView {
or_replace,
name,
columns,
query,
with_options,
..
} if with_options.is_empty() => {
let mut plan = self.query_to_plan(*query, &mut PlannerContext::new())?;
plan = self.apply_expr_alias(plan, columns)?;
Ok(LogicalPlan::Ddl(DdlStatement::CreateView(CreateView {
name: self.object_name_to_table_reference(name)?,
input: Arc::new(plan),
or_replace,
definition: sql,
})))
}
Statement::ShowCreate { obj_type, obj_name } => match obj_type {
ShowCreateObject::Table => self.show_create_table_to_plan(obj_name),
_ => Err(DataFusionError::NotImplemented(
"Only `SHOW CREATE TABLE ...` statement is supported".to_string(),
)),
},
Statement::CreateSchema {
schema_name,
if_not_exists,
} => Ok(LogicalPlan::Ddl(DdlStatement::CreateCatalogSchema(
CreateCatalogSchema {
schema_name: get_schema_name(&schema_name),
if_not_exists,
schema: Arc::new(DFSchema::empty()),
},
))),
Statement::CreateDatabase {
db_name,
if_not_exists,
..
} => Ok(LogicalPlan::Ddl(DdlStatement::CreateCatalog(
CreateCatalog {
catalog_name: object_name_to_string(&db_name),
if_not_exists,
schema: Arc::new(DFSchema::empty()),
},
))),
Statement::Drop {
object_type,
if_exists,
mut names,
cascade,
restrict: _,
purge: _,
} => {
// We don't support cascade and purge for now.
// nor do we support multiple object names
let name = match names.len() {
0 => Err(ParserError("Missing table name.".to_string()).into()),
1 => self.object_name_to_table_reference(names.pop().unwrap()),
_ => {
Err(ParserError("Multiple objects not supported".to_string())
.into())
}
}?;
match object_type {
ObjectType::Table => {
Ok(LogicalPlan::Ddl(DdlStatement::DropTable(DropTable {
name,
if_exists,
schema: DFSchemaRef::new(DFSchema::empty()),
})))
}
ObjectType::View => {
Ok(LogicalPlan::Ddl(DdlStatement::DropView(DropView {
name,
if_exists,
schema: DFSchemaRef::new(DFSchema::empty()),
})))
}
ObjectType::Schema => {
let name = match name {
TableReference::Bare { table } => Ok(SchemaReference::Bare { schema: table } ) ,
TableReference::Partial { schema, table } => Ok(SchemaReference::Full { schema: table,catalog: schema }),
TableReference::Full { catalog: _, schema: _, table: _ } => {
Err(ParserError("Invalid schema specifier (has 3 parts)".to_string()))
},
}?;
Ok(LogicalPlan::Ddl(DdlStatement::DropCatalogSchema(DropCatalogSchema {
name,
if_exists,
cascade,
schema: DFSchemaRef::new(DFSchema::empty()),
})))},
_ => Err(DataFusionError::NotImplemented(
"Only `DROP TABLE/VIEW/SCHEMA ...` statement is supported currently"
.to_string(),
)),
}
}
Statement::Prepare {
name,
data_types,
statement,
} => {
// Convert parser data types to DataFusion data types
let data_types: Vec<DataType> = data_types
.into_iter()
.map(|t| self.convert_data_type(&t))
.collect::<Result<_>>()?;
// Create planner context with parameters
let mut planner_context = PlannerContext::new()
.with_prepare_param_data_types(data_types.clone());
// Build logical plan for inner statement of the prepare statement
let plan = self.sql_statement_to_plan_with_context(
*statement,
&mut planner_context,
)?;
Ok(LogicalPlan::Prepare(Prepare {
name: ident_to_string(&name),
data_types,
input: Arc::new(plan),
}))
}
Statement::ShowTables {
extended,
full,
db_name,
filter,
} => self.show_tables_to_plan(extended, full, db_name, filter),
Statement::ShowColumns {
extended,
full,
table_name,
filter,
} => self.show_columns_to_plan(extended, full, table_name, filter),
Statement::Insert {
or,
into,
table_name,
columns,
overwrite,
source,
partitioned,
after_columns,
table,
on,
returning,
} => {
if or.is_some() {
plan_err!("Inserts with or clauses not supported")?;
}
if partitioned.is_some() {
plan_err!("Partitioned inserts not yet supported")?;
}
if !after_columns.is_empty() {
plan_err!("After-columns clause not supported")?;
}
if table {
plan_err!("Table clause not supported")?;
}
if on.is_some() {
plan_err!("Insert-on clause not supported")?;
}
if returning.is_some() {
plan_err!("Insert-returning clause not supported")?;
}
let _ = into; // optional keyword doesn't change behavior
self.insert_to_plan(table_name, columns, source, overwrite)
}
Statement::Update {
table,
assignments,
from,
selection,
returning,
} => {
if returning.is_some() {
plan_err!("Update-returning clause not yet supported")?;
}
self.update_to_plan(table, assignments, from, selection)
}
Statement::Delete {
tables,
using,
selection,
returning,
from,
} => {
if !tables.is_empty() {
plan_err!("DELETE <TABLE> not supported")?;
}
if using.is_some() {
plan_err!("Using clause not supported")?;
}
if returning.is_some() {
plan_err!("Delete-returning clause not yet supported")?;
}
let table_name = self.get_delete_target(from)?;
self.delete_to_plan(table_name, selection)
}
Statement::StartTransaction { modes } => {
let isolation_level: ast::TransactionIsolationLevel = modes
.iter()
.filter_map(|m: &ast::TransactionMode| match m {
TransactionMode::AccessMode(_) => None,
TransactionMode::IsolationLevel(level) => Some(level),
})
.last()
.copied()
.unwrap_or(ast::TransactionIsolationLevel::Serializable);
let access_mode: ast::TransactionAccessMode = modes
.iter()
.filter_map(|m: &ast::TransactionMode| match m {
TransactionMode::AccessMode(mode) => Some(mode),
TransactionMode::IsolationLevel(_) => None,
})
.last()
.copied()
.unwrap_or(ast::TransactionAccessMode::ReadWrite);
let isolation_level = match isolation_level {
ast::TransactionIsolationLevel::ReadUncommitted => {
TransactionIsolationLevel::ReadUncommitted
}
ast::TransactionIsolationLevel::ReadCommitted => {
TransactionIsolationLevel::ReadCommitted
}
ast::TransactionIsolationLevel::RepeatableRead => {
TransactionIsolationLevel::RepeatableRead
}
ast::TransactionIsolationLevel::Serializable => {
TransactionIsolationLevel::Serializable
}
};
let access_mode = match access_mode {
ast::TransactionAccessMode::ReadOnly => {
TransactionAccessMode::ReadOnly
}
ast::TransactionAccessMode::ReadWrite => {
TransactionAccessMode::ReadWrite
}
};
let statement = PlanStatement::TransactionStart(TransactionStart {
access_mode,
isolation_level,
schema: DFSchemaRef::new(DFSchema::empty()),
});
Ok(LogicalPlan::Statement(statement))
}
Statement::Commit { chain } => {
let statement = PlanStatement::TransactionEnd(TransactionEnd {
conclusion: TransactionConclusion::Commit,
chain,
schema: DFSchemaRef::new(DFSchema::empty()),
});
Ok(LogicalPlan::Statement(statement))
}
Statement::Rollback { chain } => {
let statement = PlanStatement::TransactionEnd(TransactionEnd {
conclusion: TransactionConclusion::Rollback,
chain,
schema: DFSchemaRef::new(DFSchema::empty()),
});
Ok(LogicalPlan::Statement(statement))
}
_ => Err(DataFusionError::NotImplemented(format!(
"Unsupported SQL statement: {sql:?}"
))),
}
}