private def performUpdate()

in backends-clickhouse/src-delta-32/main/scala/org/apache/spark/sql/delta/commands/UpdateCommand.scala [115:329]


  private def performUpdate(
      sparkSession: SparkSession, deltaLog: DeltaLog, txn: OptimisticTransaction): Unit = {
    import org.apache.spark.sql.delta.implicits._

    var numTouchedFiles: Long = 0
    var numRewrittenFiles: Long = 0
    var numAddedBytes: Long = 0
    var numRemovedBytes: Long = 0
    var numAddedChangeFiles: Long = 0
    var changeFileBytes: Long = 0
    var scanTimeMs: Long = 0
    var rewriteTimeMs: Long = 0
    var numDeletionVectorsAdded: Long = 0
    var numDeletionVectorsRemoved: Long = 0
    var numDeletionVectorsUpdated: Long = 0

    val startTime = System.nanoTime()
    val numFilesTotal = txn.snapshot.numOfFiles

    val updateCondition = condition.getOrElse(Literal.TrueLiteral)
    val (metadataPredicates, dataPredicates) =
      DeltaTableUtils.splitMetadataAndDataPredicates(
        updateCondition, txn.metadata.partitionColumns, sparkSession)

    // Should we write the DVs to represent updated rows?
    val shouldWriteDeletionVectors = shouldWritePersistentDeletionVectors(sparkSession, txn)
    val candidateFiles = txn.filterFiles(
      metadataPredicates ++ dataPredicates,
      keepNumRecords = shouldWriteDeletionVectors)

    val nameToAddFile = generateCandidateFileMap(deltaLog.dataPath, candidateFiles)

    scanTimeMs = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startTime)

    val filesToRewrite: Seq[TouchedFileWithDV] = if (candidateFiles.isEmpty) {
      // Case 1: Do nothing if no row qualifies the partition predicates
      // that are part of Update condition
      Nil
    } else if (dataPredicates.isEmpty) {
      // Case 2: Update all the rows from the files that are in the specified partitions
      // when the data filter is empty
      candidateFiles
        .map(f => TouchedFileWithDV(f.path, f, newDeletionVector = null, deletedRows = 0L))
    } else {
      // Case 3: Find all the affected files using the user-specified condition
      val fileIndex = new TahoeBatchFileIndex(
        sparkSession, "update", candidateFiles, deltaLog, tahoeFileIndex.path, txn.snapshot)

      val touchedFilesWithDV = if (shouldWriteDeletionVectors) {
        // Case 3.1: Find all the affected files via DV path
        val targetDf = DMLWithDeletionVectorsHelper.createTargetDfForScanningForMatches(
          sparkSession,
          target,
          fileIndex)

        // Does the target table already has DVs enabled? If so, we need to read the table
        // with deletion vectors.
        val mustReadDeletionVectors = DeletionVectorUtils.deletionVectorsReadable(txn.snapshot)

        DMLWithDeletionVectorsHelper.findTouchedFiles(
          sparkSession,
          txn,
          mustReadDeletionVectors,
          deltaLog,
          targetDf,
          fileIndex,
          updateCondition,
          opName = "UPDATE")
      } else {
        // Case 3.2: Find all the affected files using the non-DV path
        // Keep everything from the resolved target except a new TahoeFileIndex
        // that only involves the affected files instead of all files.
        val newTarget = DeltaTableUtils.replaceFileIndex(target, fileIndex)
        val data = Dataset.ofRows(sparkSession, newTarget)
        val incrUpdatedCountExpr = IncrementMetric(TrueLiteral, metrics("numUpdatedRows"))
        val pathsToRewrite =
          withStatusCode("DELTA", UpdateCommand.FINDING_TOUCHED_FILES_MSG) {
            // --- modified start
            data.filter(new Column(updateCondition))
              .select(input_file_name().as("input_files"))
              .filter(new Column(incrUpdatedCountExpr))
              .distinct()
              .as[String]
              .collect()
            // --- modified end
          }

        // Wrap AddFile into TouchedFileWithDV that has empty DV.
        pathsToRewrite
          .map(getTouchedFile(deltaLog.dataPath, _, nameToAddFile))
          .map(f => TouchedFileWithDV(f.path, f, newDeletionVector = null, deletedRows = 0L))
          .toSeq
      }
      // Refresh scan time for Case 3, since we performed scan here.
      scanTimeMs = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startTime)
      touchedFilesWithDV
    }

    val totalActions = {
      // When DV is on, we first mask removed rows with DVs and generate (remove, add) pairs.
      val actionsForExistingFiles = if (shouldWriteDeletionVectors) {
        // When there's no data predicate, all matched files are removed.
        if (dataPredicates.isEmpty) {
          val operationTimestamp = System.currentTimeMillis()
          filesToRewrite.map(_.fileLogEntry.removeWithTimestamp(operationTimestamp))
        } else {
          // When there is data predicate, we generate (remove, add) pairs.
          val filesToRewriteWithDV = filesToRewrite.filter(_.newDeletionVector != null)
          val (dvActions, metricMap) = DMLWithDeletionVectorsHelper.processUnmodifiedData(
            sparkSession,
            filesToRewriteWithDV,
            txn.snapshot)
          metrics("numUpdatedRows").set(metricMap("numModifiedRows"))
          numDeletionVectorsAdded = metricMap("numDeletionVectorsAdded")
          numDeletionVectorsRemoved = metricMap("numDeletionVectorsRemoved")
          numDeletionVectorsUpdated = metricMap("numDeletionVectorsUpdated")
          numTouchedFiles = metricMap("numRemovedFiles")
          dvActions
        }
      } else {
        // Without DV we'll leave the job to `rewriteFiles`.
        Nil
      }

      // When DV is on, we write out updated rows only. The return value will be only `add` actions.
      // When DV is off, we write out updated rows plus unmodified rows from the same file, then
      // return `add` and `remove` actions.
      val rewriteStartNs = System.nanoTime()
      val actionsForNewFiles =
        withStatusCode("DELTA", UpdateCommand.rewritingFilesMsg(filesToRewrite.size)) {
          if (filesToRewrite.nonEmpty) {
            rewriteFiles(
              sparkSession,
              txn,
              rootPath = tahoeFileIndex.path,
              inputLeafFiles = filesToRewrite.map(_.fileLogEntry),
              nameToAddFileMap = nameToAddFile,
              condition = updateCondition,
              generateRemoveFileActions = !shouldWriteDeletionVectors,
              copyUnmodifiedRows = !shouldWriteDeletionVectors)
          } else {
            Nil
          }
        }
      rewriteTimeMs = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - rewriteStartNs)

      numTouchedFiles = filesToRewrite.length
      val (addActions, removeActions) = actionsForNewFiles.partition(_.isInstanceOf[AddFile])
      numRewrittenFiles = addActions.size
      numAddedBytes = addActions.map(_.getFileSize).sum
      numRemovedBytes = removeActions.map(_.getFileSize).sum

      actionsForExistingFiles ++ actionsForNewFiles
    }

    val changeActions = totalActions.collect { case f: AddCDCFile => f }
    numAddedChangeFiles = changeActions.size
    changeFileBytes = changeActions.map(_.size).sum

    metrics("numAddedFiles").set(numRewrittenFiles)
    metrics("numAddedBytes").set(numAddedBytes)
    metrics("numAddedChangeFiles").set(numAddedChangeFiles)
    metrics("changeFileBytes").set(changeFileBytes)
    metrics("numRemovedFiles").set(numTouchedFiles)
    metrics("numRemovedBytes").set(numRemovedBytes)
    metrics("executionTimeMs").set(TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startTime))
    metrics("scanTimeMs").set(scanTimeMs)
    metrics("rewriteTimeMs").set(rewriteTimeMs)
    // In the case where the numUpdatedRows is not captured, we can siphon out the metrics from
    // the BasicWriteStatsTracker. This is for case 2 where the update condition contains only
    // metadata predicates and so the entire partition is re-written.
    val outputRows = txn.getMetric("numOutputRows").map(_.value).getOrElse(-1L)
    if (metrics("numUpdatedRows").value == 0 && outputRows != 0 &&
      metrics("numCopiedRows").value == 0) {
      // We know that numTouchedRows = numCopiedRows + numUpdatedRows.
      // Since an entire partition was re-written, no rows were copied.
      // So numTouchedRows == numUpdateRows
      metrics("numUpdatedRows").set(metrics("numTouchedRows").value)
    } else {
      // This is for case 3 where the update condition contains both metadata and data predicates
      // so relevant files will have some rows updated and some rows copied. We don't need to
      // consider case 1 here, where no files match the update condition, as we know that
      // `totalActions` is empty.
      metrics("numCopiedRows").set(
        metrics("numTouchedRows").value - metrics("numUpdatedRows").value)
      metrics("numDeletionVectorsAdded").set(numDeletionVectorsAdded)
      metrics("numDeletionVectorsRemoved").set(numDeletionVectorsRemoved)
      metrics("numDeletionVectorsUpdated").set(numDeletionVectorsUpdated)
    }
    txn.registerSQLMetrics(sparkSession, metrics)

    val finalActions = createSetTransaction(sparkSession, deltaLog).toSeq ++ totalActions
    txn.commitIfNeeded(
      actions = finalActions,
      op = DeltaOperations.Update(condition),
      tags = RowTracking.addPreservedRowTrackingTagIfNotSet(txn.snapshot))
    sendDriverMetrics(sparkSession, metrics)

    recordDeltaEvent(
      deltaLog,
      "delta.dml.update.stats",
      data = UpdateMetric(
        condition = condition.map(_.sql).getOrElse("true"),
        numFilesTotal,
        numTouchedFiles,
        numRewrittenFiles,
        numAddedChangeFiles,
        changeFileBytes,
        scanTimeMs,
        rewriteTimeMs,
        numDeletionVectorsAdded,
        numDeletionVectorsRemoved,
        numDeletionVectorsUpdated)
    )
  }