def performDelete()

in backends-clickhouse/src-delta-23/main/scala/org/apache/spark/sql/delta/commands/DeleteCommand.scala [146:403]


  def performDelete(
      sparkSession: SparkSession,
      deltaLog: DeltaLog,
      txn: OptimisticTransaction): Seq[Action] = {
    import org.apache.spark.sql.delta.implicits._

    var numRemovedFiles: Long = 0
    var numAddedFiles: Long = 0
    var numAddedChangeFiles: Long = 0
    var scanTimeMs: Long = 0
    var rewriteTimeMs: Long = 0
    var numAddedBytes: Long = 0
    var changeFileBytes: Long = 0
    var numRemovedBytes: Long = 0
    var numFilesBeforeSkipping: Long = 0
    var numBytesBeforeSkipping: Long = 0
    var numFilesAfterSkipping: Long = 0
    var numBytesAfterSkipping: Long = 0
    var numPartitionsAfterSkipping: Option[Long] = None
    var numPartitionsRemovedFrom: Option[Long] = None
    var numPartitionsAddedTo: Option[Long] = None
    var numDeletedRows: Option[Long] = None
    var numCopiedRows: Option[Long] = None

    val startTime = System.nanoTime()
    val numFilesTotal = txn.snapshot.numOfFiles

    val deleteActions: Seq[Action] = condition match {
      case None =>
        // Case 1: Delete the whole table if the condition is true
        val reportRowLevelMetrics = conf.getConf(DeltaSQLConf.DELTA_DML_METRICS_FROM_METADATA)
        val allFiles = txn.filterFiles(Nil, keepNumRecords = reportRowLevelMetrics)

        numRemovedFiles = allFiles.size
        scanTimeMs = (System.nanoTime() - startTime) / 1000 / 1000
        val (numBytes, numPartitions) = totalBytesAndDistinctPartitionValues(allFiles)
        numRemovedBytes = numBytes
        numFilesBeforeSkipping = numRemovedFiles
        numBytesBeforeSkipping = numBytes
        numFilesAfterSkipping = numRemovedFiles
        numBytesAfterSkipping = numBytes
        numDeletedRows = getDeletedRowsFromAddFilesAndUpdateMetrics(allFiles)

        if (txn.metadata.partitionColumns.nonEmpty) {
          numPartitionsAfterSkipping = Some(numPartitions)
          numPartitionsRemovedFrom = Some(numPartitions)
          numPartitionsAddedTo = Some(0)
        }
        val operationTimestamp = System.currentTimeMillis()
        allFiles.map(_.removeWithTimestamp(operationTimestamp))
      case Some(cond) =>
        val (metadataPredicates, otherPredicates) =
          DeltaTableUtils.splitMetadataAndDataPredicates(
            cond, txn.metadata.partitionColumns, sparkSession)

        numFilesBeforeSkipping = txn.snapshot.numOfFiles
        numBytesBeforeSkipping = txn.snapshot.sizeInBytes

        if (otherPredicates.isEmpty) {
          // Case 2: The condition can be evaluated using metadata only.
          //         Delete a set of files without the need of scanning any data files.
          val operationTimestamp = System.currentTimeMillis()
          val reportRowLevelMetrics = conf.getConf(DeltaSQLConf.DELTA_DML_METRICS_FROM_METADATA)
          val candidateFiles =
            txn.filterFiles(metadataPredicates, keepNumRecords = reportRowLevelMetrics)

          scanTimeMs = (System.nanoTime() - startTime) / 1000 / 1000
          numRemovedFiles = candidateFiles.size
          numRemovedBytes = candidateFiles.map(_.size).sum
          numFilesAfterSkipping = candidateFiles.size
          val (numCandidateBytes, numCandidatePartitions) =
            totalBytesAndDistinctPartitionValues(candidateFiles)
          numBytesAfterSkipping = numCandidateBytes
          numDeletedRows = getDeletedRowsFromAddFilesAndUpdateMetrics(candidateFiles)

          if (txn.metadata.partitionColumns.nonEmpty) {
            numPartitionsAfterSkipping = Some(numCandidatePartitions)
            numPartitionsRemovedFrom = Some(numCandidatePartitions)
            numPartitionsAddedTo = Some(0)
          }
          candidateFiles.map(_.removeWithTimestamp(operationTimestamp))
        } else {
          // Case 3: Delete the rows based on the condition.

          // Should we write the DVs to represent the deleted rows?
          val shouldWriteDVs = shouldWritePersistentDeletionVectors(sparkSession, txn)

          val candidateFiles = txn.filterFiles(
            metadataPredicates ++ otherPredicates,
            keepNumRecords = shouldWriteDVs)
          // `candidateFiles` contains the files filtered using statistics and delete condition
          // They may or may not contains any rows that need to be deleted.

          numFilesAfterSkipping = candidateFiles.size
          val (numCandidateBytes, numCandidatePartitions) =
            totalBytesAndDistinctPartitionValues(candidateFiles)
          numBytesAfterSkipping = numCandidateBytes
          if (txn.metadata.partitionColumns.nonEmpty) {
            numPartitionsAfterSkipping = Some(numCandidatePartitions)
          }

          val nameToAddFileMap = generateCandidateFileMap(deltaLog.dataPath, candidateFiles)

          val fileIndex = new TahoeBatchFileIndex(
            sparkSession, "delete", candidateFiles, deltaLog, deltaLog.dataPath, txn.snapshot)
          if (shouldWriteDVs) {
            val targetDf = DeleteWithDeletionVectorsHelper.createTargetDfForScanningForMatches(
              sparkSession,
              target,
              fileIndex)

            // Does the target table already has DVs enabled? If so, we need to read the table
            // with deletion vectors.
            val mustReadDeletionVectors = DeletionVectorUtils.deletionVectorsReadable(txn.snapshot)

            val touchedFiles = DeleteWithDeletionVectorsHelper.findTouchedFiles(
              sparkSession,
              txn,
              mustReadDeletionVectors,
              deltaLog,
              targetDf,
              fileIndex,
              cond)

            if (touchedFiles.nonEmpty) {
              DeleteWithDeletionVectorsHelper.processUnmodifiedData(touchedFiles)
            } else {
              Nil // Nothing to update
            }
          } else {
            // Keep everything from the resolved target except a new TahoeFileIndex
            // that only involves the affected files instead of all files.
            val newTarget = DeltaTableUtils.replaceFileIndex(target, fileIndex)
            val data = Dataset.ofRows(sparkSession, newTarget)
            val deletedRowCount = metrics("numDeletedRows")
            val deletedRowUdf = DeltaUDF.boolean { () =>
              deletedRowCount += 1
              true
            }.asNondeterministic()
            val filesToRewrite =
              withStatusCode("DELTA", FINDING_TOUCHED_FILES_MSG) {
                if (candidateFiles.isEmpty) {
                  Array.empty[String]
                } else {
                  // --- modified start
                  data.filter(new Column(cond))
                    .select(input_file_name().as("input_files"))
                    .filter(deletedRowUdf())
                    .distinct()
                    .as[String]
                    .collect()
                  // --- modified end
                }
              }

            numRemovedFiles = filesToRewrite.length
            scanTimeMs = (System.nanoTime() - startTime) / 1000 / 1000
            if (filesToRewrite.isEmpty) {
              // Case 3.1: no row matches and no delete will be triggered
              if (txn.metadata.partitionColumns.nonEmpty) {
                numPartitionsRemovedFrom = Some(0)
                numPartitionsAddedTo = Some(0)
              }
              Nil
            } else {
              // Case 3.2: some files need an update to remove the deleted files
              // Do the second pass and just read the affected files
              val baseRelation = buildBaseRelation(
                sparkSession, txn, "delete", deltaLog.dataPath, filesToRewrite, nameToAddFileMap)
              // Keep everything from the resolved target except a new TahoeFileIndex
              // that only involves the affected files instead of all files.
              val newTarget = DeltaTableUtils.replaceFileIndex(target, baseRelation.location)
              val targetDF = Dataset.ofRows(sparkSession, newTarget)
              val filterCond = Not(EqualNullSafe(cond, Literal.TrueLiteral))
              val rewrittenActions = rewriteFiles(txn, targetDF, filterCond, filesToRewrite.length)
              val (changeFiles, rewrittenFiles) = rewrittenActions
                .partition(_.isInstanceOf[AddCDCFile])
              numAddedFiles = rewrittenFiles.size
              val removedFiles = filesToRewrite.map(f =>
                getTouchedFile(deltaLog.dataPath, f, nameToAddFileMap))
              val (removedBytes, removedPartitions) =
                totalBytesAndDistinctPartitionValues(removedFiles)
              numRemovedBytes = removedBytes
              val (rewrittenBytes, rewrittenPartitions) =
                totalBytesAndDistinctPartitionValues(rewrittenFiles)
              numAddedBytes = rewrittenBytes
              if (txn.metadata.partitionColumns.nonEmpty) {
                numPartitionsRemovedFrom = Some(removedPartitions)
                numPartitionsAddedTo = Some(rewrittenPartitions)
              }
              numAddedChangeFiles = changeFiles.size
              changeFileBytes = changeFiles.collect { case f: AddCDCFile => f.size }.sum
              rewriteTimeMs = (System.nanoTime() - startTime) / 1000 / 1000 - scanTimeMs
              numDeletedRows = Some(metrics("numDeletedRows").value)
              numCopiedRows =
                Some(metrics("numTouchedRows").value - metrics("numDeletedRows").value)

              val operationTimestamp = System.currentTimeMillis()
              removeFilesFromPaths(
                deltaLog, nameToAddFileMap, filesToRewrite, operationTimestamp) ++ rewrittenActions
            }
          }
        }
    }
    metrics("numRemovedFiles").set(numRemovedFiles)
    metrics("numAddedFiles").set(numAddedFiles)
    val executionTimeMs = (System.nanoTime() - startTime) / 1000 / 1000
    metrics("executionTimeMs").set(executionTimeMs)
    metrics("scanTimeMs").set(scanTimeMs)
    metrics("rewriteTimeMs").set(rewriteTimeMs)
    metrics("numAddedChangeFiles").set(numAddedChangeFiles)
    metrics("changeFileBytes").set(changeFileBytes)
    metrics("numAddedBytes").set(numAddedBytes)
    metrics("numRemovedBytes").set(numRemovedBytes)
    metrics("numFilesBeforeSkipping").set(numFilesBeforeSkipping)
    metrics("numBytesBeforeSkipping").set(numBytesBeforeSkipping)
    metrics("numFilesAfterSkipping").set(numFilesAfterSkipping)
    metrics("numBytesAfterSkipping").set(numBytesAfterSkipping)
    numPartitionsAfterSkipping.foreach(metrics("numPartitionsAfterSkipping").set)
    numPartitionsAddedTo.foreach(metrics("numPartitionsAddedTo").set)
    numPartitionsRemovedFrom.foreach(metrics("numPartitionsRemovedFrom").set)
    numCopiedRows.foreach(metrics("numCopiedRows").set)
    txn.registerSQLMetrics(sparkSession, metrics)
    sendDriverMetrics(sparkSession, metrics)

    recordDeltaEvent(
      deltaLog,
      "delta.dml.delete.stats",
      data = DeleteMetric(
        condition = condition.map(_.sql).getOrElse("true"),
        numFilesTotal,
        numFilesAfterSkipping,
        numAddedFiles,
        numRemovedFiles,
        numAddedFiles,
        numAddedChangeFiles = numAddedChangeFiles,
        numFilesBeforeSkipping,
        numBytesBeforeSkipping,
        numFilesAfterSkipping,
        numBytesAfterSkipping,
        numPartitionsAfterSkipping,
        numPartitionsAddedTo,
        numPartitionsRemovedFrom,
        numCopiedRows,
        numDeletedRows,
        numAddedBytes,
        numRemovedBytes,
        changeFileBytes = changeFileBytes,
        scanTimeMs,
        rewriteTimeMs)
    )

    if (deleteActions.nonEmpty) {
      createSetTransaction(sparkSession, deltaLog).toSeq ++ deleteActions
    } else {
      Seq.empty
    }
  }