private def writeAllChanges()

in backends-clickhouse/src-delta-23/main/scala/org/apache/spark/sql/delta/commands/MergeIntoCommand.scala [660:961]


  private def writeAllChanges(
    spark: SparkSession,
    deltaTxn: OptimisticTransaction,
    filesToRewrite: Seq[AddFile]
  ): Seq[FileAction] = recordMergeOperation(sqlMetricName = "rewriteTimeMs") {
    import org.apache.spark.sql.catalyst.expressions.Literal.{TrueLiteral, FalseLiteral}

    val cdcEnabled = DeltaConfigs.CHANGE_DATA_FEED.fromMetaData(deltaTxn.metadata)

    var targetOutputCols = getTargetOutputCols(deltaTxn)
    var outputRowSchema = deltaTxn.metadata.schema

    // When we have duplicate matches (only allowed when the whenMatchedCondition is a delete with
    // no match condition) we will incorrectly generate duplicate CDC rows.
    // Duplicate matches can be due to:
    // - Duplicate rows in the source w.r.t. the merge condition
    // - A target-only or source-only merge condition, which essentially turns our join into a cross
    //   join with the target/source satisfiying the merge condition.
    // These duplicate matches are dropped from the main data output since this is a delete
    // operation, but the duplicate CDC rows are not removed by default.
    // See https://github.com/delta-io/delta/issues/1274

    // We address this specific scenario by adding row ids to the target before performing our join.
    // There should only be one CDC delete row per target row so we can use these row ids to dedupe
    // the duplicate CDC delete rows.

    // We also need to address the scenario when there are duplicate matches with delete and we
    // insert duplicate rows. Here we need to additionally add row ids to the source before the
    // join to avoid dropping these valid duplicate inserted rows and their corresponding cdc rows.

    // When there is an insert clause, we set SOURCE_ROW_ID_COL=null for all delete rows because we
    // need to drop the duplicate matches.
    val isDeleteWithDuplicateMatchesAndCdc = multipleMatchDeleteOnlyOvercount.nonEmpty && cdcEnabled

    // Generate a new target dataframe that has same output attributes exprIds as the target plan.
    // This allows us to apply the existing resolved update/insert expressions.
    val baseTargetDF = buildTargetPlanWithFiles(spark, deltaTxn, filesToRewrite)
    val joinType = if (hasNoInserts &&
      spark.conf.get(DeltaSQLConf.MERGE_MATCHED_ONLY_ENABLED)) {
      "rightOuter"
    } else {
      "fullOuter"
    }

    logDebug(s"""writeAllChanges using $joinType join:
                |  source.output: ${source.outputSet}
                |  target.output: ${target.outputSet}
                |  condition: $condition
                |  newTarget.output: ${baseTargetDF.queryExecution.logical.outputSet}
       """.stripMargin)

    // UDFs to update metrics
    val incrSourceRowCountExpr = makeMetricUpdateUDF("numSourceRowsInSecondScan")
    val incrUpdatedCountExpr = makeMetricUpdateUDF("numTargetRowsUpdated")
    val incrUpdatedMatchedCountExpr = makeMetricUpdateUDF("numTargetRowsMatchedUpdated")
    val incrUpdatedNotMatchedBySourceCountExpr =
      makeMetricUpdateUDF("numTargetRowsNotMatchedBySourceUpdated")
    val incrInsertedCountExpr = makeMetricUpdateUDF("numTargetRowsInserted")
    val incrNoopCountExpr = makeMetricUpdateUDF("numTargetRowsCopied")
    val incrDeletedCountExpr = makeMetricUpdateUDF("numTargetRowsDeleted")
    val incrDeletedMatchedCountExpr = makeMetricUpdateUDF("numTargetRowsMatchedDeleted")
    val incrDeletedNotMatchedBySourceCountExpr =
      makeMetricUpdateUDF("numTargetRowsNotMatchedBySourceDeleted")

    // Apply an outer join to find both, matches and non-matches. We are adding two boolean fields
    // with value `true`, one to each side of the join. Whether this field is null or not after
    // the outer join, will allow us to identify whether the resultant joined row was a
    // matched inner result or an unmatched result with null on one side.
    // We add row IDs to the targetDF if we have a delete-when-matched clause with duplicate
    // matches and CDC is enabled, and additionally add row IDs to the source if we also have an
    // insert clause. See above at isDeleteWithDuplicateMatchesAndCdc definition for more details.
    var sourceDF = getSourceDF()
      .withColumn(SOURCE_ROW_PRESENT_COL, new Column(incrSourceRowCountExpr))
    var targetDF = baseTargetDF
      .withColumn(TARGET_ROW_PRESENT_COL, lit(true))
    if (isDeleteWithDuplicateMatchesAndCdc) {
      targetDF = targetDF.withColumn(TARGET_ROW_ID_COL, monotonically_increasing_id())
      if (notMatchedClauses.nonEmpty) { // insert clause
        sourceDF = sourceDF.withColumn(SOURCE_ROW_ID_COL, monotonically_increasing_id())
      }
    }
    val joinedDF = sourceDF.join(targetDF, new Column(condition), joinType)
    val joinedPlan = joinedDF.queryExecution.analyzed

    def resolveOnJoinedPlan(exprs: Seq[Expression]): Seq[Expression] = {
      tryResolveReferencesForExpressions(spark, exprs, joinedPlan)
    }

    // ==== Generate the expressions to process full-outer join output and generate target rows ====
    // If there are N columns in the target table, there will be N + 3 columns after processing
    // - N columns for target table
    // - ROW_DROPPED_COL to define whether the generated row should dropped or written
    // - INCR_ROW_COUNT_COL containing a UDF to update the output row row counter
    // - CDC_TYPE_COLUMN_NAME containing the type of change being performed in a particular row

    // To generate these N + 3 columns, we will generate N + 3 expressions and apply them to the
    // rows in the joinedDF. The CDC column will be either used for CDC generation or dropped before
    // performing the final write, and the other two will always be dropped after executing the
    // metrics UDF and filtering on ROW_DROPPED_COL.

    // We produce rows for both the main table data (with CDC_TYPE_COLUMN_NAME = CDC_TYPE_NOT_CDC),
    // and rows for the CDC data which will be output to CDCReader.CDC_LOCATION.
    // See [[CDCReader]] for general details on how partitioning on the CDC type column works.

    // In the following functions `updateOutput`, `deleteOutput` and `insertOutput`, we
    // produce a Seq[Expression] for each intended output row.
    // Depending on the clause and whether CDC is enabled, we output between 0 and 3 rows, as a
    // Seq[Seq[Expression]]

    // There is one corner case outlined above at isDeleteWithDuplicateMatchesAndCdc definition.
    // When we have a delete-ONLY merge with duplicate matches we have N + 4 columns:
    // N target cols, TARGET_ROW_ID_COL, ROW_DROPPED_COL, INCR_ROW_COUNT_COL, CDC_TYPE_COLUMN_NAME
    // When we have a delete-when-matched merge with duplicate matches + an insert clause, we have
    // N + 5 columns:
    // N target cols, TARGET_ROW_ID_COL, SOURCE_ROW_ID_COL, ROW_DROPPED_COL, INCR_ROW_COUNT_COL,
    // CDC_TYPE_COLUMN_NAME
    // These ROW_ID_COL will always be dropped before the final write.

    if (isDeleteWithDuplicateMatchesAndCdc) {
      targetOutputCols = targetOutputCols :+ UnresolvedAttribute(TARGET_ROW_ID_COL)
      outputRowSchema = outputRowSchema.add(TARGET_ROW_ID_COL, DataTypes.LongType)
      if (notMatchedClauses.nonEmpty) { // there is an insert clause, make SRC_ROW_ID_COL=null
        targetOutputCols = targetOutputCols :+ Alias(Literal(null), SOURCE_ROW_ID_COL)()
        outputRowSchema = outputRowSchema.add(SOURCE_ROW_ID_COL, DataTypes.LongType)
      }
    }

    if (cdcEnabled) {
      outputRowSchema = outputRowSchema
        .add(ROW_DROPPED_COL, DataTypes.BooleanType)
        .add(INCR_ROW_COUNT_COL, DataTypes.BooleanType)
        .add(CDC_TYPE_COLUMN_NAME, DataTypes.StringType)
    }

    def updateOutput(resolvedActions: Seq[DeltaMergeAction], incrMetricExpr: Expression)
      : Seq[Seq[Expression]] = {
      val updateExprs = {
        // Generate update expressions and set ROW_DELETED_COL = false and
        // CDC_TYPE_COLUMN_NAME = CDC_TYPE_NOT_CDC
        val mainDataOutput = resolvedActions.map(_.expr) :+ FalseLiteral :+
          incrMetricExpr :+ CDC_TYPE_NOT_CDC
        if (cdcEnabled) {
          // For update preimage, we have do a no-op copy with ROW_DELETED_COL = false and
          // CDC_TYPE_COLUMN_NAME = CDC_TYPE_UPDATE_PREIMAGE and INCR_ROW_COUNT_COL as a no-op
          // (because the metric will be incremented in `mainDataOutput`)
          val preImageOutput = targetOutputCols :+ FalseLiteral :+ TrueLiteral :+
            Literal(CDC_TYPE_UPDATE_PREIMAGE)
          // For update postimage, we have the same expressions as for mainDataOutput but with
          // INCR_ROW_COUNT_COL as a no-op (because the metric will be incremented in
          // `mainDataOutput`), and CDC_TYPE_COLUMN_NAME = CDC_TYPE_UPDATE_POSTIMAGE
          val postImageOutput = mainDataOutput.dropRight(2) :+ TrueLiteral :+
            Literal(CDC_TYPE_UPDATE_POSTIMAGE)
          Seq(mainDataOutput, preImageOutput, postImageOutput)
        } else {
          Seq(mainDataOutput)
        }
      }
      updateExprs.map(resolveOnJoinedPlan)
    }

    def deleteOutput(incrMetricExpr: Expression): Seq[Seq[Expression]] = {
      val deleteExprs = {
        // Generate expressions to set the ROW_DELETED_COL = true and CDC_TYPE_COLUMN_NAME =
        // CDC_TYPE_NOT_CDC
        val mainDataOutput = targetOutputCols :+ TrueLiteral :+ incrMetricExpr :+
          CDC_TYPE_NOT_CDC
        if (cdcEnabled) {
          // For delete we do a no-op copy with ROW_DELETED_COL = false, INCR_ROW_COUNT_COL as a
          // no-op (because the metric will be incremented in `mainDataOutput`) and
          // CDC_TYPE_COLUMN_NAME = CDC_TYPE_DELETE
          val deleteCdcOutput = targetOutputCols :+ FalseLiteral :+ TrueLiteral :+ CDC_TYPE_DELETE
          Seq(mainDataOutput, deleteCdcOutput)
        } else {
          Seq(mainDataOutput)
        }
      }
      deleteExprs.map(resolveOnJoinedPlan)
    }

    def insertOutput(resolvedActions: Seq[DeltaMergeAction], incrMetricExpr: Expression)
      : Seq[Seq[Expression]] = {
      // Generate insert expressions and set ROW_DELETED_COL = false and
      // CDC_TYPE_COLUMN_NAME = CDC_TYPE_NOT_CDC
      val insertExprs = resolvedActions.map(_.expr)
      val mainDataOutput = resolveOnJoinedPlan(
        if (isDeleteWithDuplicateMatchesAndCdc) {
          // Must be delete-when-matched merge with duplicate matches + insert clause
          // Therefore we must keep the target row id and source row id. Since this is a not-matched
          // clause we know the target row-id will be null. See above at
          // isDeleteWithDuplicateMatchesAndCdc definition for more details.
          insertExprs :+
            Alias(Literal(null), TARGET_ROW_ID_COL)() :+ UnresolvedAttribute(SOURCE_ROW_ID_COL) :+
            FalseLiteral :+ incrMetricExpr :+ CDC_TYPE_NOT_CDC
        } else {
          insertExprs :+ FalseLiteral :+ incrMetricExpr :+ CDC_TYPE_NOT_CDC
        }
      )
      if (cdcEnabled) {
        // For insert we have the same expressions as for mainDataOutput, but with
        // INCR_ROW_COUNT_COL as a no-op (because the metric will be incremented in
        // `mainDataOutput`), and CDC_TYPE_COLUMN_NAME = CDC_TYPE_INSERT
        val insertCdcOutput = mainDataOutput.dropRight(2) :+ TrueLiteral :+ Literal(CDC_TYPE_INSERT)
        Seq(mainDataOutput, insertCdcOutput)
      } else {
        Seq(mainDataOutput)
      }
    }

    def clauseOutput(clause: DeltaMergeIntoClause): Seq[Seq[Expression]] = clause match {
      case u: DeltaMergeIntoMatchedUpdateClause =>
        updateOutput(u.resolvedActions, And(incrUpdatedCountExpr, incrUpdatedMatchedCountExpr))
      case _: DeltaMergeIntoMatchedDeleteClause =>
        deleteOutput(And(incrDeletedCountExpr, incrDeletedMatchedCountExpr))
      case i: DeltaMergeIntoNotMatchedInsertClause =>
        insertOutput(i.resolvedActions, incrInsertedCountExpr)
      case u: DeltaMergeIntoNotMatchedBySourceUpdateClause =>
        updateOutput(
          u.resolvedActions,
          And(incrUpdatedCountExpr, incrUpdatedNotMatchedBySourceCountExpr))
      case _: DeltaMergeIntoNotMatchedBySourceDeleteClause =>
        deleteOutput(And(incrDeletedCountExpr, incrDeletedNotMatchedBySourceCountExpr))
    }

    def clauseCondition(clause: DeltaMergeIntoClause): Expression = {
      // if condition is None, then expression always evaluates to true
      val condExpr = clause.condition.getOrElse(TrueLiteral)
      resolveOnJoinedPlan(Seq(condExpr)).head
    }

    val joinedRowEncoder = RowEncoder(joinedPlan.schema)
    val outputRowEncoder = RowEncoder(outputRowSchema).resolveAndBind()

    val processor = new JoinedRowProcessor(
      targetRowHasNoMatch = resolveOnJoinedPlan(Seq(col(SOURCE_ROW_PRESENT_COL).isNull.expr)).head,
      sourceRowHasNoMatch = resolveOnJoinedPlan(Seq(col(TARGET_ROW_PRESENT_COL).isNull.expr)).head,
      matchedConditions = matchedClauses.map(clauseCondition),
      matchedOutputs = matchedClauses.map(clauseOutput),
      notMatchedConditions = notMatchedClauses.map(clauseCondition),
      notMatchedOutputs = notMatchedClauses.map(clauseOutput),
      notMatchedBySourceConditions = notMatchedBySourceClauses.map(clauseCondition),
      notMatchedBySourceOutputs = notMatchedBySourceClauses.map(clauseOutput),
      noopCopyOutput =
        resolveOnJoinedPlan(targetOutputCols :+ FalseLiteral :+ incrNoopCountExpr :+
          CDC_TYPE_NOT_CDC),
      deleteRowOutput =
        resolveOnJoinedPlan(targetOutputCols :+ TrueLiteral :+ TrueLiteral :+ CDC_TYPE_NOT_CDC),
      joinedAttributes = joinedPlan.output,
      joinedRowEncoder = joinedRowEncoder,
      outputRowEncoder = outputRowEncoder)

    var outputDF =
      Dataset.ofRows(spark, joinedPlan).mapPartitions(processor.processPartition)(outputRowEncoder)

    if (isDeleteWithDuplicateMatchesAndCdc) {
      // When we have a delete when matched clause with duplicate matches we have to remove
      // duplicate CDC rows. This scenario is further explained at
      // isDeleteWithDuplicateMatchesAndCdc definition.

      // To remove duplicate CDC rows generated by the duplicate matches we dedupe by
      // TARGET_ROW_ID_COL since there should only be one CDC delete row per target row.
      // When there is an insert clause in addition to the delete clause we additionally dedupe by
      // SOURCE_ROW_ID_COL and CDC_TYPE_COLUMN_NAME to avoid dropping valid duplicate inserted rows
      // and their corresponding CDC rows.
      val columnsToDedupeBy = if (notMatchedClauses.nonEmpty) { // insert clause
        Seq(TARGET_ROW_ID_COL, SOURCE_ROW_ID_COL, CDC_TYPE_COLUMN_NAME)
      } else {
        Seq(TARGET_ROW_ID_COL)
      }
      outputDF = outputDF
        .dropDuplicates(columnsToDedupeBy)
        .drop(ROW_DROPPED_COL, INCR_ROW_COUNT_COL, TARGET_ROW_ID_COL, SOURCE_ROW_ID_COL)
    } else {
      outputDF = outputDF.drop(ROW_DROPPED_COL, INCR_ROW_COUNT_COL)
    }

    logDebug("writeAllChanges: join output plan:\n" + outputDF.queryExecution)

    // Write to Delta
    val newFiles = deltaTxn
      .writeFiles(repartitionIfNeeded(spark, outputDF, deltaTxn.metadata.partitionColumns))

    // Update metrics
    val (addedBytes, addedPartitions) = totalBytesAndDistinctPartitionValues(newFiles)
    metrics("numTargetFilesAdded") += newFiles.count(_.isInstanceOf[AddFile])
    metrics("numTargetChangeFilesAdded") += newFiles.count(_.isInstanceOf[AddCDCFile])
    metrics("numTargetChangeFileBytes") += newFiles.collect{ case f: AddCDCFile => f.size }.sum
    metrics("numTargetBytesAdded") += addedBytes
    metrics("numTargetPartitionsAddedTo") += addedPartitions
    if (multipleMatchDeleteOnlyOvercount.isDefined) {
      // Compensate for counting duplicates during the query.
      val actualRowsDeleted =
        metrics("numTargetRowsDeleted").value - multipleMatchDeleteOnlyOvercount.get
      assert(actualRowsDeleted >= 0)
      metrics("numTargetRowsDeleted").set(actualRowsDeleted)
      val actualRowsMatchedDeleted =
        metrics("numTargetRowsMatchedDeleted").value - multipleMatchDeleteOnlyOvercount.get
      assert(actualRowsMatchedDeleted >= 0)
      metrics("numTargetRowsMatchedDeleted").set(actualRowsMatchedDeleted)
    }

    newFiles
  }