private def performUpdate()

in backends-clickhouse/src-delta-20/main/scala/org/apache/spark/sql/delta/commands/UpdateCommand.scala [89:250]


  private def performUpdate(
      sparkSession: SparkSession,
      deltaLog: DeltaLog,
      txn: OptimisticTransaction): Unit = {
    import sparkSession.implicits._

    var numTouchedFiles: Long = 0
    var numRewrittenFiles: Long = 0
    var numAddedChangeFiles: Long = 0
    var changeFileBytes: Long = 0
    var scanTimeMs: Long = 0
    var rewriteTimeMs: Long = 0

    val startTime = System.nanoTime()
    val numFilesTotal = deltaLog.snapshot.numOfFiles

    val updateCondition = condition.getOrElse(Literal.TrueLiteral)
    val (metadataPredicates, dataPredicates) =
      DeltaTableUtils.splitMetadataAndDataPredicates(
        updateCondition,
        txn.metadata.partitionColumns,
        sparkSession)
    val candidateFiles = txn.filterFiles(metadataPredicates ++ dataPredicates)
    val nameToAddFile = generateCandidateFileMap(deltaLog.dataPath, candidateFiles)

    scanTimeMs = (System.nanoTime() - startTime) / 1000 / 1000

    val filesToRewrite: Seq[AddFile] = if (candidateFiles.isEmpty) {
      // Case 1: Do nothing if no row qualifies the partition predicates
      // that are part of Update condition
      Nil
    } else if (dataPredicates.isEmpty) {
      // Case 2: Update all the rows from the files that are in the specified partitions
      // when the data filter is empty
      candidateFiles
    } else {
      // Case 3: Find all the affected files using the user-specified condition
      val fileIndex = new TahoeBatchFileIndex(
        sparkSession,
        "update",
        candidateFiles,
        deltaLog,
        tahoeFileIndex.path,
        txn.snapshot)
      // Keep everything from the resolved target except a new TahoeFileIndex
      // that only involves the affected files instead of all files.
      val newTarget = DeltaTableUtils.replaceFileIndex(target, fileIndex)
      val data = Dataset.ofRows(sparkSession, newTarget)
      val updatedRowCount = metrics("numUpdatedRows")
      val updatedRowUdf = udf {
        () =>
          updatedRowCount += 1
          true
      }.asNondeterministic()
      val pathsToRewrite =
        withStatusCode("DELTA", UpdateCommand.FINDING_TOUCHED_FILES_MSG) {
          // --- modified start
          data
            .filter(new Column(updateCondition))
            .filter(updatedRowUdf())
            .select(input_file_name().as("input_files"))
            .distinct()
            .as[String]
            .collect()
          // --- modified end
        }

      scanTimeMs = (System.nanoTime() - startTime) / 1000 / 1000

      pathsToRewrite.map(getTouchedFile(deltaLog.dataPath, _, nameToAddFile)).toSeq
    }

    numTouchedFiles = filesToRewrite.length

    val newActions = if (filesToRewrite.isEmpty) {
      // Do nothing if no row qualifies the UPDATE condition
      Nil
    } else {
      // Generate the new files containing the updated values
      withStatusCode("DELTA", UpdateCommand.rewritingFilesMsg(filesToRewrite.size)) {
        rewriteFiles(
          sparkSession,
          txn,
          tahoeFileIndex.path,
          filesToRewrite.map(_.path),
          nameToAddFile,
          updateCondition)
      }
    }

    rewriteTimeMs = (System.nanoTime() - startTime) / 1000 / 1000 - scanTimeMs

    val (changeActions, addActions) = newActions.partition(_.isInstanceOf[AddCDCFile])
    numRewrittenFiles = addActions.size
    numAddedChangeFiles = changeActions.size
    changeFileBytes = changeActions.collect { case f: AddCDCFile => f.size }.sum

    val totalActions = if (filesToRewrite.isEmpty) {
      // Do nothing if no row qualifies the UPDATE condition
      Nil
    } else {
      // Delete the old files and return those delete actions along with the new AddFile actions for
      // files containing the updated values
      val operationTimestamp = System.currentTimeMillis()
      val deleteActions = filesToRewrite.map(_.removeWithTimestamp(operationTimestamp))

      deleteActions ++ newActions
    }

    if (totalActions.nonEmpty) {
      metrics("numAddedFiles").set(numRewrittenFiles)
      metrics("numAddedChangeFiles").set(numAddedChangeFiles)
      metrics("changeFileBytes").set(changeFileBytes)
      metrics("numRemovedFiles").set(numTouchedFiles)
      metrics("executionTimeMs").set((System.nanoTime() - startTime) / 1000 / 1000)
      metrics("scanTimeMs").set(scanTimeMs)
      metrics("rewriteTimeMs").set(rewriteTimeMs)
      // In the case where the numUpdatedRows is not captured, we can siphon out the metrics from
      // the BasicWriteStatsTracker. This is for case 2 where the update condition contains only
      // metadata predicates and so the entire partition is re-written.
      val outputRows = txn.getMetric("numOutputRows").map(_.value).getOrElse(-1L)
      if (
        metrics("numUpdatedRows").value == 0 && outputRows != 0 &&
        metrics("numCopiedRows").value == 0
      ) {
        // We know that numTouchedRows = numCopiedRows + numUpdatedRows.
        // Since an entire partition was re-written, no rows were copied.
        // So numTouchedRows == numUpdateRows
        metrics("numUpdatedRows").set(metrics("numTouchedRows").value)
      } else {
        // This is for case 3 where the update condition contains both metadata and data predicates
        // so relevant files will have some rows updated and some rows copied. We don't need to
        // consider case 1 here, where no files match the update condition, as we know that
        // `totalActions` is empty.
        metrics("numCopiedRows").set(
          metrics("numTouchedRows").value - metrics("numUpdatedRows").value)
      }
      txn.registerSQLMetrics(sparkSession, metrics)
      txn.commit(totalActions, DeltaOperations.Update(condition.map(_.toString)))
      // This is needed to make the SQL metrics visible in the Spark UI
      val executionId = sparkSession.sparkContext.getLocalProperty(SQLExecution.EXECUTION_ID_KEY)
      SQLMetrics.postDriverMetricUpdates(
        sparkSession.sparkContext,
        executionId,
        metrics.values.toSeq)
    }

    recordDeltaEvent(
      deltaLog,
      "delta.dml.update.stats",
      data = UpdateMetric(
        condition = condition.map(_.sql).getOrElse("true"),
        numFilesTotal,
        numTouchedFiles,
        numRewrittenFiles,
        numAddedChangeFiles,
        changeFileBytes,
        scanTimeMs,
        rewriteTimeMs
      )
    )
  }