in backends-clickhouse/src-delta-23/main/scala/org/apache/spark/sql/delta/commands/MergeIntoCommand.scala [660:961]
private def writeAllChanges(
spark: SparkSession,
deltaTxn: OptimisticTransaction,
filesToRewrite: Seq[AddFile]
): Seq[FileAction] = recordMergeOperation(sqlMetricName = "rewriteTimeMs") {
import org.apache.spark.sql.catalyst.expressions.Literal.{TrueLiteral, FalseLiteral}
val cdcEnabled = DeltaConfigs.CHANGE_DATA_FEED.fromMetaData(deltaTxn.metadata)
var targetOutputCols = getTargetOutputCols(deltaTxn)
var outputRowSchema = deltaTxn.metadata.schema
// When we have duplicate matches (only allowed when the whenMatchedCondition is a delete with
// no match condition) we will incorrectly generate duplicate CDC rows.
// Duplicate matches can be due to:
// - Duplicate rows in the source w.r.t. the merge condition
// - A target-only or source-only merge condition, which essentially turns our join into a cross
// join with the target/source satisfiying the merge condition.
// These duplicate matches are dropped from the main data output since this is a delete
// operation, but the duplicate CDC rows are not removed by default.
// See https://github.com/delta-io/delta/issues/1274
// We address this specific scenario by adding row ids to the target before performing our join.
// There should only be one CDC delete row per target row so we can use these row ids to dedupe
// the duplicate CDC delete rows.
// We also need to address the scenario when there are duplicate matches with delete and we
// insert duplicate rows. Here we need to additionally add row ids to the source before the
// join to avoid dropping these valid duplicate inserted rows and their corresponding cdc rows.
// When there is an insert clause, we set SOURCE_ROW_ID_COL=null for all delete rows because we
// need to drop the duplicate matches.
val isDeleteWithDuplicateMatchesAndCdc = multipleMatchDeleteOnlyOvercount.nonEmpty && cdcEnabled
// Generate a new target dataframe that has same output attributes exprIds as the target plan.
// This allows us to apply the existing resolved update/insert expressions.
val baseTargetDF = buildTargetPlanWithFiles(spark, deltaTxn, filesToRewrite)
val joinType = if (hasNoInserts &&
spark.conf.get(DeltaSQLConf.MERGE_MATCHED_ONLY_ENABLED)) {
"rightOuter"
} else {
"fullOuter"
}
logDebug(s"""writeAllChanges using $joinType join:
| source.output: ${source.outputSet}
| target.output: ${target.outputSet}
| condition: $condition
| newTarget.output: ${baseTargetDF.queryExecution.logical.outputSet}
""".stripMargin)
// UDFs to update metrics
val incrSourceRowCountExpr = makeMetricUpdateUDF("numSourceRowsInSecondScan")
val incrUpdatedCountExpr = makeMetricUpdateUDF("numTargetRowsUpdated")
val incrUpdatedMatchedCountExpr = makeMetricUpdateUDF("numTargetRowsMatchedUpdated")
val incrUpdatedNotMatchedBySourceCountExpr =
makeMetricUpdateUDF("numTargetRowsNotMatchedBySourceUpdated")
val incrInsertedCountExpr = makeMetricUpdateUDF("numTargetRowsInserted")
val incrNoopCountExpr = makeMetricUpdateUDF("numTargetRowsCopied")
val incrDeletedCountExpr = makeMetricUpdateUDF("numTargetRowsDeleted")
val incrDeletedMatchedCountExpr = makeMetricUpdateUDF("numTargetRowsMatchedDeleted")
val incrDeletedNotMatchedBySourceCountExpr =
makeMetricUpdateUDF("numTargetRowsNotMatchedBySourceDeleted")
// Apply an outer join to find both, matches and non-matches. We are adding two boolean fields
// with value `true`, one to each side of the join. Whether this field is null or not after
// the outer join, will allow us to identify whether the resultant joined row was a
// matched inner result or an unmatched result with null on one side.
// We add row IDs to the targetDF if we have a delete-when-matched clause with duplicate
// matches and CDC is enabled, and additionally add row IDs to the source if we also have an
// insert clause. See above at isDeleteWithDuplicateMatchesAndCdc definition for more details.
var sourceDF = getSourceDF()
.withColumn(SOURCE_ROW_PRESENT_COL, new Column(incrSourceRowCountExpr))
var targetDF = baseTargetDF
.withColumn(TARGET_ROW_PRESENT_COL, lit(true))
if (isDeleteWithDuplicateMatchesAndCdc) {
targetDF = targetDF.withColumn(TARGET_ROW_ID_COL, monotonically_increasing_id())
if (notMatchedClauses.nonEmpty) { // insert clause
sourceDF = sourceDF.withColumn(SOURCE_ROW_ID_COL, monotonically_increasing_id())
}
}
val joinedDF = sourceDF.join(targetDF, new Column(condition), joinType)
val joinedPlan = joinedDF.queryExecution.analyzed
def resolveOnJoinedPlan(exprs: Seq[Expression]): Seq[Expression] = {
tryResolveReferencesForExpressions(spark, exprs, joinedPlan)
}
// ==== Generate the expressions to process full-outer join output and generate target rows ====
// If there are N columns in the target table, there will be N + 3 columns after processing
// - N columns for target table
// - ROW_DROPPED_COL to define whether the generated row should dropped or written
// - INCR_ROW_COUNT_COL containing a UDF to update the output row row counter
// - CDC_TYPE_COLUMN_NAME containing the type of change being performed in a particular row
// To generate these N + 3 columns, we will generate N + 3 expressions and apply them to the
// rows in the joinedDF. The CDC column will be either used for CDC generation or dropped before
// performing the final write, and the other two will always be dropped after executing the
// metrics UDF and filtering on ROW_DROPPED_COL.
// We produce rows for both the main table data (with CDC_TYPE_COLUMN_NAME = CDC_TYPE_NOT_CDC),
// and rows for the CDC data which will be output to CDCReader.CDC_LOCATION.
// See [[CDCReader]] for general details on how partitioning on the CDC type column works.
// In the following functions `updateOutput`, `deleteOutput` and `insertOutput`, we
// produce a Seq[Expression] for each intended output row.
// Depending on the clause and whether CDC is enabled, we output between 0 and 3 rows, as a
// Seq[Seq[Expression]]
// There is one corner case outlined above at isDeleteWithDuplicateMatchesAndCdc definition.
// When we have a delete-ONLY merge with duplicate matches we have N + 4 columns:
// N target cols, TARGET_ROW_ID_COL, ROW_DROPPED_COL, INCR_ROW_COUNT_COL, CDC_TYPE_COLUMN_NAME
// When we have a delete-when-matched merge with duplicate matches + an insert clause, we have
// N + 5 columns:
// N target cols, TARGET_ROW_ID_COL, SOURCE_ROW_ID_COL, ROW_DROPPED_COL, INCR_ROW_COUNT_COL,
// CDC_TYPE_COLUMN_NAME
// These ROW_ID_COL will always be dropped before the final write.
if (isDeleteWithDuplicateMatchesAndCdc) {
targetOutputCols = targetOutputCols :+ UnresolvedAttribute(TARGET_ROW_ID_COL)
outputRowSchema = outputRowSchema.add(TARGET_ROW_ID_COL, DataTypes.LongType)
if (notMatchedClauses.nonEmpty) { // there is an insert clause, make SRC_ROW_ID_COL=null
targetOutputCols = targetOutputCols :+ Alias(Literal(null), SOURCE_ROW_ID_COL)()
outputRowSchema = outputRowSchema.add(SOURCE_ROW_ID_COL, DataTypes.LongType)
}
}
if (cdcEnabled) {
outputRowSchema = outputRowSchema
.add(ROW_DROPPED_COL, DataTypes.BooleanType)
.add(INCR_ROW_COUNT_COL, DataTypes.BooleanType)
.add(CDC_TYPE_COLUMN_NAME, DataTypes.StringType)
}
def updateOutput(resolvedActions: Seq[DeltaMergeAction], incrMetricExpr: Expression)
: Seq[Seq[Expression]] = {
val updateExprs = {
// Generate update expressions and set ROW_DELETED_COL = false and
// CDC_TYPE_COLUMN_NAME = CDC_TYPE_NOT_CDC
val mainDataOutput = resolvedActions.map(_.expr) :+ FalseLiteral :+
incrMetricExpr :+ CDC_TYPE_NOT_CDC
if (cdcEnabled) {
// For update preimage, we have do a no-op copy with ROW_DELETED_COL = false and
// CDC_TYPE_COLUMN_NAME = CDC_TYPE_UPDATE_PREIMAGE and INCR_ROW_COUNT_COL as a no-op
// (because the metric will be incremented in `mainDataOutput`)
val preImageOutput = targetOutputCols :+ FalseLiteral :+ TrueLiteral :+
Literal(CDC_TYPE_UPDATE_PREIMAGE)
// For update postimage, we have the same expressions as for mainDataOutput but with
// INCR_ROW_COUNT_COL as a no-op (because the metric will be incremented in
// `mainDataOutput`), and CDC_TYPE_COLUMN_NAME = CDC_TYPE_UPDATE_POSTIMAGE
val postImageOutput = mainDataOutput.dropRight(2) :+ TrueLiteral :+
Literal(CDC_TYPE_UPDATE_POSTIMAGE)
Seq(mainDataOutput, preImageOutput, postImageOutput)
} else {
Seq(mainDataOutput)
}
}
updateExprs.map(resolveOnJoinedPlan)
}
def deleteOutput(incrMetricExpr: Expression): Seq[Seq[Expression]] = {
val deleteExprs = {
// Generate expressions to set the ROW_DELETED_COL = true and CDC_TYPE_COLUMN_NAME =
// CDC_TYPE_NOT_CDC
val mainDataOutput = targetOutputCols :+ TrueLiteral :+ incrMetricExpr :+
CDC_TYPE_NOT_CDC
if (cdcEnabled) {
// For delete we do a no-op copy with ROW_DELETED_COL = false, INCR_ROW_COUNT_COL as a
// no-op (because the metric will be incremented in `mainDataOutput`) and
// CDC_TYPE_COLUMN_NAME = CDC_TYPE_DELETE
val deleteCdcOutput = targetOutputCols :+ FalseLiteral :+ TrueLiteral :+ CDC_TYPE_DELETE
Seq(mainDataOutput, deleteCdcOutput)
} else {
Seq(mainDataOutput)
}
}
deleteExprs.map(resolveOnJoinedPlan)
}
def insertOutput(resolvedActions: Seq[DeltaMergeAction], incrMetricExpr: Expression)
: Seq[Seq[Expression]] = {
// Generate insert expressions and set ROW_DELETED_COL = false and
// CDC_TYPE_COLUMN_NAME = CDC_TYPE_NOT_CDC
val insertExprs = resolvedActions.map(_.expr)
val mainDataOutput = resolveOnJoinedPlan(
if (isDeleteWithDuplicateMatchesAndCdc) {
// Must be delete-when-matched merge with duplicate matches + insert clause
// Therefore we must keep the target row id and source row id. Since this is a not-matched
// clause we know the target row-id will be null. See above at
// isDeleteWithDuplicateMatchesAndCdc definition for more details.
insertExprs :+
Alias(Literal(null), TARGET_ROW_ID_COL)() :+ UnresolvedAttribute(SOURCE_ROW_ID_COL) :+
FalseLiteral :+ incrMetricExpr :+ CDC_TYPE_NOT_CDC
} else {
insertExprs :+ FalseLiteral :+ incrMetricExpr :+ CDC_TYPE_NOT_CDC
}
)
if (cdcEnabled) {
// For insert we have the same expressions as for mainDataOutput, but with
// INCR_ROW_COUNT_COL as a no-op (because the metric will be incremented in
// `mainDataOutput`), and CDC_TYPE_COLUMN_NAME = CDC_TYPE_INSERT
val insertCdcOutput = mainDataOutput.dropRight(2) :+ TrueLiteral :+ Literal(CDC_TYPE_INSERT)
Seq(mainDataOutput, insertCdcOutput)
} else {
Seq(mainDataOutput)
}
}
def clauseOutput(clause: DeltaMergeIntoClause): Seq[Seq[Expression]] = clause match {
case u: DeltaMergeIntoMatchedUpdateClause =>
updateOutput(u.resolvedActions, And(incrUpdatedCountExpr, incrUpdatedMatchedCountExpr))
case _: DeltaMergeIntoMatchedDeleteClause =>
deleteOutput(And(incrDeletedCountExpr, incrDeletedMatchedCountExpr))
case i: DeltaMergeIntoNotMatchedInsertClause =>
insertOutput(i.resolvedActions, incrInsertedCountExpr)
case u: DeltaMergeIntoNotMatchedBySourceUpdateClause =>
updateOutput(
u.resolvedActions,
And(incrUpdatedCountExpr, incrUpdatedNotMatchedBySourceCountExpr))
case _: DeltaMergeIntoNotMatchedBySourceDeleteClause =>
deleteOutput(And(incrDeletedCountExpr, incrDeletedNotMatchedBySourceCountExpr))
}
def clauseCondition(clause: DeltaMergeIntoClause): Expression = {
// if condition is None, then expression always evaluates to true
val condExpr = clause.condition.getOrElse(TrueLiteral)
resolveOnJoinedPlan(Seq(condExpr)).head
}
val joinedRowEncoder = RowEncoder(joinedPlan.schema)
val outputRowEncoder = RowEncoder(outputRowSchema).resolveAndBind()
val processor = new JoinedRowProcessor(
targetRowHasNoMatch = resolveOnJoinedPlan(Seq(col(SOURCE_ROW_PRESENT_COL).isNull.expr)).head,
sourceRowHasNoMatch = resolveOnJoinedPlan(Seq(col(TARGET_ROW_PRESENT_COL).isNull.expr)).head,
matchedConditions = matchedClauses.map(clauseCondition),
matchedOutputs = matchedClauses.map(clauseOutput),
notMatchedConditions = notMatchedClauses.map(clauseCondition),
notMatchedOutputs = notMatchedClauses.map(clauseOutput),
notMatchedBySourceConditions = notMatchedBySourceClauses.map(clauseCondition),
notMatchedBySourceOutputs = notMatchedBySourceClauses.map(clauseOutput),
noopCopyOutput =
resolveOnJoinedPlan(targetOutputCols :+ FalseLiteral :+ incrNoopCountExpr :+
CDC_TYPE_NOT_CDC),
deleteRowOutput =
resolveOnJoinedPlan(targetOutputCols :+ TrueLiteral :+ TrueLiteral :+ CDC_TYPE_NOT_CDC),
joinedAttributes = joinedPlan.output,
joinedRowEncoder = joinedRowEncoder,
outputRowEncoder = outputRowEncoder)
var outputDF =
Dataset.ofRows(spark, joinedPlan).mapPartitions(processor.processPartition)(outputRowEncoder)
if (isDeleteWithDuplicateMatchesAndCdc) {
// When we have a delete when matched clause with duplicate matches we have to remove
// duplicate CDC rows. This scenario is further explained at
// isDeleteWithDuplicateMatchesAndCdc definition.
// To remove duplicate CDC rows generated by the duplicate matches we dedupe by
// TARGET_ROW_ID_COL since there should only be one CDC delete row per target row.
// When there is an insert clause in addition to the delete clause we additionally dedupe by
// SOURCE_ROW_ID_COL and CDC_TYPE_COLUMN_NAME to avoid dropping valid duplicate inserted rows
// and their corresponding CDC rows.
val columnsToDedupeBy = if (notMatchedClauses.nonEmpty) { // insert clause
Seq(TARGET_ROW_ID_COL, SOURCE_ROW_ID_COL, CDC_TYPE_COLUMN_NAME)
} else {
Seq(TARGET_ROW_ID_COL)
}
outputDF = outputDF
.dropDuplicates(columnsToDedupeBy)
.drop(ROW_DROPPED_COL, INCR_ROW_COUNT_COL, TARGET_ROW_ID_COL, SOURCE_ROW_ID_COL)
} else {
outputDF = outputDF.drop(ROW_DROPPED_COL, INCR_ROW_COUNT_COL)
}
logDebug("writeAllChanges: join output plan:\n" + outputDF.queryExecution)
// Write to Delta
val newFiles = deltaTxn
.writeFiles(repartitionIfNeeded(spark, outputDF, deltaTxn.metadata.partitionColumns))
// Update metrics
val (addedBytes, addedPartitions) = totalBytesAndDistinctPartitionValues(newFiles)
metrics("numTargetFilesAdded") += newFiles.count(_.isInstanceOf[AddFile])
metrics("numTargetChangeFilesAdded") += newFiles.count(_.isInstanceOf[AddCDCFile])
metrics("numTargetChangeFileBytes") += newFiles.collect{ case f: AddCDCFile => f.size }.sum
metrics("numTargetBytesAdded") += addedBytes
metrics("numTargetPartitionsAddedTo") += addedPartitions
if (multipleMatchDeleteOnlyOvercount.isDefined) {
// Compensate for counting duplicates during the query.
val actualRowsDeleted =
metrics("numTargetRowsDeleted").value - multipleMatchDeleteOnlyOvercount.get
assert(actualRowsDeleted >= 0)
metrics("numTargetRowsDeleted").set(actualRowsDeleted)
val actualRowsMatchedDeleted =
metrics("numTargetRowsMatchedDeleted").value - multipleMatchDeleteOnlyOvercount.get
assert(actualRowsMatchedDeleted >= 0)
metrics("numTargetRowsMatchedDeleted").set(actualRowsMatchedDeleted)
}
newFiles
}