in datafusion/sql/src/statement.rs [929:1041]
fn insert_to_plan(
&self,
table_name: ObjectName,
columns: Vec<Ident>,
source: Box<Query>,
overwrite: bool,
) -> Result<LogicalPlan> {
// Do a table lookup to verify the table exists
let table_name = self.object_name_to_table_reference(table_name)?;
let provider = self
.schema_provider
.get_table_provider(table_name.clone())?;
let arrow_schema = (*provider.schema()).clone();
let table_schema = DFSchema::try_from(arrow_schema)?;
// Get insert fields and index_mapping
// The i-th field of the table is `fields[index_mapping[i]]`
let (fields, index_mapping) = if columns.is_empty() {
// Empty means we're inserting into all columns of the table
(
table_schema.fields().clone(),
(0..table_schema.fields().len())
.map(Some)
.collect::<Vec<_>>(),
)
} else {
let mut mapping = vec![None; table_schema.fields().len()];
let fields = columns
.into_iter()
.map(|c| self.normalizer.normalize(c))
.enumerate()
.map(|(i, c)| {
let column_index = table_schema
.index_of_column_by_name(None, &c)?
.ok_or_else(|| unqualified_field_not_found(&c, &table_schema))?;
if mapping[column_index].is_some() {
return Err(DataFusionError::SchemaError(
datafusion_common::SchemaError::DuplicateUnqualifiedField {
name: c,
},
));
} else {
mapping[column_index] = Some(i);
}
Ok(table_schema.field(column_index).clone())
})
.collect::<Result<Vec<DFField>>>()?;
(fields, mapping)
};
// infer types for Values clause... other types should be resolvable the regular way
let mut prepare_param_data_types = BTreeMap::new();
if let SetExpr::Values(ast::Values { rows, .. }) = (*source.body).clone() {
for row in rows.iter() {
for (idx, val) in row.iter().enumerate() {
if let ast::Expr::Value(Value::Placeholder(name)) = val {
let name =
name.replace('$', "").parse::<usize>().map_err(|_| {
DataFusionError::Plan(format!(
"Can't parse placeholder: {name}"
))
})? - 1;
let field = fields.get(idx).ok_or_else(|| {
DataFusionError::Plan(format!(
"Placeholder ${} refers to a non existent column",
idx + 1
))
})?;
let dt = field.field().data_type().clone();
let _ = prepare_param_data_types.insert(name, dt);
}
}
}
}
let prepare_param_data_types = prepare_param_data_types.into_values().collect();
// Projection
let mut planner_context =
PlannerContext::new().with_prepare_param_data_types(prepare_param_data_types);
let source = self.query_to_plan(*source, &mut planner_context)?;
if fields.len() != source.schema().fields().len() {
plan_err!("Column count doesn't match insert query!")?;
}
let exprs = index_mapping
.into_iter()
.flatten()
.map(|i| {
let target_field = &fields[i];
let source_field = source.schema().field(i);
let expr =
datafusion_expr::Expr::Column(source_field.unqualified_column())
.cast_to(target_field.data_type(), source.schema())?
.alias(target_field.name());
Ok(expr)
})
.collect::<Result<Vec<datafusion_expr::Expr>>>()?;
let source = project(source, exprs)?;
let op = if overwrite {
WriteOp::InsertOverwrite
} else {
WriteOp::InsertInto
};
let plan = LogicalPlan::Dml(DmlStatement {
table_name,
table_schema: Arc::new(table_schema),
op,
input: Arc::new(source),
});
Ok(plan)
}