in spark/v3.4/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/analysis/RewriteMergeIntoTable.scala [84:166]
override def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators {
case m @ MergeIntoIcebergTable(aliasedTable, source, cond, matchedActions, notMatchedActions, None)
if m.resolved && m.aligned && matchedActions.isEmpty && notMatchedActions.size == 1 =>
EliminateSubqueryAliases(aliasedTable) match {
case r: DataSourceV2Relation =>
// NOT MATCHED conditions may only refer to columns in source so they can be pushed down
val insertAction = notMatchedActions.head.asInstanceOf[InsertAction]
val filteredSource = insertAction.condition match {
case Some(insertCond) => Filter(insertCond, source)
case None => source
}
// when there are no MATCHED actions, use a left anti join to remove any matching rows
// and switch to using a regular append instead of a row-level merge
// only unmatched source rows that match the condition are appended to the table
val joinPlan = Join(filteredSource, r, LeftAnti, Some(cond), JoinHint.NONE)
val outputExprs = insertAction.assignments.map(_.value)
val outputColNames = r.output.map(_.name)
val outputCols = outputExprs.zip(outputColNames).map { case (expr, name) =>
Alias(expr, name)()
}
val project = Project(outputCols, joinPlan)
AppendData.byPosition(r, project)
case p =>
throw new AnalysisException(s"$p is not an Iceberg table")
}
case m @ MergeIntoIcebergTable(aliasedTable, source, cond, matchedActions, notMatchedActions, None)
if m.resolved && m.aligned && matchedActions.isEmpty =>
EliminateSubqueryAliases(aliasedTable) match {
case r: DataSourceV2Relation =>
// when there are no MATCHED actions, use a left anti join to remove any matching rows
// and switch to using a regular append instead of a row-level merge
// only unmatched source rows that match action conditions are appended to the table
val joinPlan = Join(source, r, LeftAnti, Some(cond), JoinHint.NONE)
val notMatchedConditions = notMatchedActions.map(actionCondition)
val notMatchedOutputs = notMatchedActions.map(notMatchedActionOutput(_, Nil))
// merge rows as there are multiple not matched actions
val mergeRows = MergeRows(
isSourceRowPresent = TrueLiteral,
isTargetRowPresent = FalseLiteral,
matchedConditions = Nil,
matchedOutputs = Nil,
notMatchedConditions = notMatchedConditions,
notMatchedOutputs = notMatchedOutputs,
targetOutput = Nil,
performCardinalityCheck = false,
emitNotMatchedTargetRows = false,
output = buildMergeRowsOutput(Nil, notMatchedOutputs, r.output),
joinPlan)
AppendData.byPosition(r, mergeRows)
case p =>
throw new AnalysisException(s"$p is not an Iceberg table")
}
case m @ MergeIntoIcebergTable(aliasedTable, source, cond, matchedActions, notMatchedActions, None)
if m.resolved && m.aligned =>
EliminateSubqueryAliases(aliasedTable) match {
case r @ DataSourceV2Relation(tbl: SupportsRowLevelOperations, _, _, _, _) =>
val table = buildOperationTable(tbl, MERGE, CaseInsensitiveStringMap.empty())
val rewritePlan = table.operation match {
case _: SupportsDelta =>
buildWriteDeltaPlan(r, table, source, cond, matchedActions, notMatchedActions)
case _ =>
buildReplaceDataPlan(r, table, source, cond, matchedActions, notMatchedActions)
}
m.copy(rewritePlan = Some(rewritePlan))
case p =>
throw new AnalysisException(s"$p is not an Iceberg table")
}
}