fn insert_to_plan()

in datafusion/sql/src/statement.rs [929:1041]


    fn insert_to_plan(
        &self,
        table_name: ObjectName,
        columns: Vec<Ident>,
        source: Box<Query>,
        overwrite: bool,
    ) -> Result<LogicalPlan> {
        // Do a table lookup to verify the table exists
        let table_name = self.object_name_to_table_reference(table_name)?;
        let provider = self
            .schema_provider
            .get_table_provider(table_name.clone())?;
        let arrow_schema = (*provider.schema()).clone();
        let table_schema = DFSchema::try_from(arrow_schema)?;

        // Get insert fields and index_mapping
        // The i-th field of the table is `fields[index_mapping[i]]`
        let (fields, index_mapping) = if columns.is_empty() {
            // Empty means we're inserting into all columns of the table
            (
                table_schema.fields().clone(),
                (0..table_schema.fields().len())
                    .map(Some)
                    .collect::<Vec<_>>(),
            )
        } else {
            let mut mapping = vec![None; table_schema.fields().len()];
            let fields = columns
                .into_iter()
                .map(|c| self.normalizer.normalize(c))
                .enumerate()
                .map(|(i, c)| {
                    let column_index = table_schema
                        .index_of_column_by_name(None, &c)?
                        .ok_or_else(|| unqualified_field_not_found(&c, &table_schema))?;
                    if mapping[column_index].is_some() {
                        return Err(DataFusionError::SchemaError(
                            datafusion_common::SchemaError::DuplicateUnqualifiedField {
                                name: c,
                            },
                        ));
                    } else {
                        mapping[column_index] = Some(i);
                    }
                    Ok(table_schema.field(column_index).clone())
                })
                .collect::<Result<Vec<DFField>>>()?;
            (fields, mapping)
        };

        // infer types for Values clause... other types should be resolvable the regular way
        let mut prepare_param_data_types = BTreeMap::new();
        if let SetExpr::Values(ast::Values { rows, .. }) = (*source.body).clone() {
            for row in rows.iter() {
                for (idx, val) in row.iter().enumerate() {
                    if let ast::Expr::Value(Value::Placeholder(name)) = val {
                        let name =
                            name.replace('$', "").parse::<usize>().map_err(|_| {
                                DataFusionError::Plan(format!(
                                    "Can't parse placeholder: {name}"
                                ))
                            })? - 1;
                        let field = fields.get(idx).ok_or_else(|| {
                            DataFusionError::Plan(format!(
                                "Placeholder ${} refers to a non existent column",
                                idx + 1
                            ))
                        })?;
                        let dt = field.field().data_type().clone();
                        let _ = prepare_param_data_types.insert(name, dt);
                    }
                }
            }
        }
        let prepare_param_data_types = prepare_param_data_types.into_values().collect();

        // Projection
        let mut planner_context =
            PlannerContext::new().with_prepare_param_data_types(prepare_param_data_types);
        let source = self.query_to_plan(*source, &mut planner_context)?;
        if fields.len() != source.schema().fields().len() {
            plan_err!("Column count doesn't match insert query!")?;
        }

        let exprs = index_mapping
            .into_iter()
            .flatten()
            .map(|i| {
                let target_field = &fields[i];
                let source_field = source.schema().field(i);
                let expr =
                    datafusion_expr::Expr::Column(source_field.unqualified_column())
                        .cast_to(target_field.data_type(), source.schema())?
                        .alias(target_field.name());
                Ok(expr)
            })
            .collect::<Result<Vec<datafusion_expr::Expr>>>()?;
        let source = project(source, exprs)?;

        let op = if overwrite {
            WriteOp::InsertOverwrite
        } else {
            WriteOp::InsertInto
        };

        let plan = LogicalPlan::Dml(DmlStatement {
            table_name,
            table_schema: Arc::new(table_schema),
            op,
            input: Arc::new(source),
        });
        Ok(plan)
    }