in backends-clickhouse/src-delta-32/main/scala/org/apache/spark/sql/delta/commands/DeleteCommand.scala [156:433]
def performDelete(
sparkSession: SparkSession,
deltaLog: DeltaLog,
txn: OptimisticTransaction): Seq[Action] = {
import org.apache.spark.sql.delta.implicits._
var numRemovedFiles: Long = 0
var numAddedFiles: Long = 0
var numAddedChangeFiles: Long = 0
var scanTimeMs: Long = 0
var rewriteTimeMs: Long = 0
var numAddedBytes: Long = 0
var changeFileBytes: Long = 0
var numRemovedBytes: Long = 0
var numFilesBeforeSkipping: Long = 0
var numBytesBeforeSkipping: Long = 0
var numFilesAfterSkipping: Long = 0
var numBytesAfterSkipping: Long = 0
var numPartitionsAfterSkipping: Option[Long] = None
var numPartitionsRemovedFrom: Option[Long] = None
var numPartitionsAddedTo: Option[Long] = None
var numDeletedRows: Option[Long] = None
var numCopiedRows: Option[Long] = None
var numDeletionVectorsAdded: Long = 0
var numDeletionVectorsRemoved: Long = 0
var numDeletionVectorsUpdated: Long = 0
val startTime = System.nanoTime()
val numFilesTotal = txn.snapshot.numOfFiles
val deleteActions: Seq[Action] = condition match {
case None =>
// Case 1: Delete the whole table if the condition is true
val reportRowLevelMetrics = conf.getConf(DeltaSQLConf.DELTA_DML_METRICS_FROM_METADATA)
val allFiles = txn.filterFiles(Nil, keepNumRecords = reportRowLevelMetrics)
numRemovedFiles = allFiles.size
numDeletionVectorsRemoved = allFiles.count(_.deletionVector != null)
scanTimeMs = (System.nanoTime() - startTime) / 1000 / 1000
val (numBytes, numPartitions) = totalBytesAndDistinctPartitionValues(allFiles)
numRemovedBytes = numBytes
numFilesBeforeSkipping = numRemovedFiles
numBytesBeforeSkipping = numBytes
numFilesAfterSkipping = numRemovedFiles
numBytesAfterSkipping = numBytes
numDeletedRows = getDeletedRowsFromAddFilesAndUpdateMetrics(allFiles)
if (txn.metadata.partitionColumns.nonEmpty) {
numPartitionsAfterSkipping = Some(numPartitions)
numPartitionsRemovedFrom = Some(numPartitions)
numPartitionsAddedTo = Some(0)
}
val operationTimestamp = System.currentTimeMillis()
allFiles.map(_.removeWithTimestamp(operationTimestamp))
case Some(cond) =>
val (metadataPredicates, otherPredicates) =
DeltaTableUtils.splitMetadataAndDataPredicates(
cond, txn.metadata.partitionColumns, sparkSession)
numFilesBeforeSkipping = txn.snapshot.numOfFiles
numBytesBeforeSkipping = txn.snapshot.sizeInBytes
if (otherPredicates.isEmpty) {
// Case 2: The condition can be evaluated using metadata only.
// Delete a set of files without the need of scanning any data files.
val operationTimestamp = System.currentTimeMillis()
val reportRowLevelMetrics = conf.getConf(DeltaSQLConf.DELTA_DML_METRICS_FROM_METADATA)
val candidateFiles =
txn.filterFiles(metadataPredicates, keepNumRecords = reportRowLevelMetrics)
scanTimeMs = (System.nanoTime() - startTime) / 1000 / 1000
numRemovedFiles = candidateFiles.size
numRemovedBytes = candidateFiles.map(_.size).sum
numFilesAfterSkipping = candidateFiles.size
numDeletionVectorsRemoved = candidateFiles.count(_.deletionVector != null)
val (numCandidateBytes, numCandidatePartitions) =
totalBytesAndDistinctPartitionValues(candidateFiles)
numBytesAfterSkipping = numCandidateBytes
numDeletedRows = getDeletedRowsFromAddFilesAndUpdateMetrics(candidateFiles)
if (txn.metadata.partitionColumns.nonEmpty) {
numPartitionsAfterSkipping = Some(numCandidatePartitions)
numPartitionsRemovedFrom = Some(numCandidatePartitions)
numPartitionsAddedTo = Some(0)
}
candidateFiles.map(_.removeWithTimestamp(operationTimestamp))
} else {
// Case 3: Delete the rows based on the condition.
// Should we write the DVs to represent the deleted rows?
val shouldWriteDVs = shouldWritePersistentDeletionVectors(sparkSession, txn)
val candidateFiles = txn.filterFiles(
metadataPredicates ++ otherPredicates,
keepNumRecords = shouldWriteDVs)
// `candidateFiles` contains the files filtered using statistics and delete condition
// They may or may not contains any rows that need to be deleted.
numFilesAfterSkipping = candidateFiles.size
val (numCandidateBytes, numCandidatePartitions) =
totalBytesAndDistinctPartitionValues(candidateFiles)
numBytesAfterSkipping = numCandidateBytes
if (txn.metadata.partitionColumns.nonEmpty) {
numPartitionsAfterSkipping = Some(numCandidatePartitions)
}
val nameToAddFileMap = generateCandidateFileMap(deltaLog.dataPath, candidateFiles)
val fileIndex = new TahoeBatchFileIndex(
sparkSession, "delete", candidateFiles, deltaLog, deltaLog.dataPath, txn.snapshot)
if (shouldWriteDVs) {
val targetDf = DMLWithDeletionVectorsHelper.createTargetDfForScanningForMatches(
sparkSession,
target,
fileIndex)
// Does the target table already has DVs enabled? If so, we need to read the table
// with deletion vectors.
val mustReadDeletionVectors = DeletionVectorUtils.deletionVectorsReadable(txn.snapshot)
val touchedFiles = DMLWithDeletionVectorsHelper.findTouchedFiles(
sparkSession,
txn,
mustReadDeletionVectors,
deltaLog,
targetDf,
fileIndex,
cond,
opName = "DELETE")
if (touchedFiles.nonEmpty) {
val (actions, metricMap) = DMLWithDeletionVectorsHelper.processUnmodifiedData(
sparkSession,
touchedFiles,
txn.snapshot)
metrics("numDeletedRows").set(metricMap("numModifiedRows"))
numDeletionVectorsAdded = metricMap("numDeletionVectorsAdded")
numDeletionVectorsRemoved = metricMap("numDeletionVectorsRemoved")
numDeletionVectorsUpdated = metricMap("numDeletionVectorsUpdated")
numRemovedFiles = metricMap("numRemovedFiles")
actions
} else {
Nil // Nothing to update
}
} else {
// Keep everything from the resolved target except a new TahoeFileIndex
// that only involves the affected files instead of all files.
val newTarget = DeltaTableUtils.replaceFileIndex(target, fileIndex)
val data = Dataset.ofRows(sparkSession, newTarget)
val incrDeletedCountExpr = IncrementMetric(TrueLiteral, metrics("numDeletedRows"))
val filesToRewrite =
withStatusCode("DELTA", FINDING_TOUCHED_FILES_MSG) {
if (candidateFiles.isEmpty) {
Array.empty[String]
} else {
// --- modified start
data.filter(new Column(cond))
.select(input_file_name().as("input_files"))
.filter(new Column(incrDeletedCountExpr))
.distinct()
.as[String]
.collect()
// --- modified end
}
}
numRemovedFiles = filesToRewrite.length
scanTimeMs = (System.nanoTime() - startTime) / 1000 / 1000
if (filesToRewrite.isEmpty) {
// Case 3.1: no row matches and no delete will be triggered
if (txn.metadata.partitionColumns.nonEmpty) {
numPartitionsRemovedFrom = Some(0)
numPartitionsAddedTo = Some(0)
}
Nil
} else {
// Case 3.2: some files need an update to remove the deleted files
// Do the second pass and just read the affected files
val baseRelation = buildBaseRelation(
sparkSession, txn, "delete", deltaLog.dataPath, filesToRewrite, nameToAddFileMap)
// Keep everything from the resolved target except a new TahoeFileIndex
// that only involves the affected files instead of all files.
val newTarget = DeltaTableUtils.replaceFileIndex(target, baseRelation.location)
val targetDF = RowTracking.preserveRowTrackingColumns(
dfWithoutRowTrackingColumns = Dataset.ofRows(sparkSession, newTarget),
snapshot = txn.snapshot)
val filterCond = Not(EqualNullSafe(cond, Literal.TrueLiteral))
val rewrittenActions = rewriteFiles(txn, targetDF, filterCond, filesToRewrite.length)
val (changeFiles, rewrittenFiles) = rewrittenActions
.partition(_.isInstanceOf[AddCDCFile])
numAddedFiles = rewrittenFiles.size
val removedFiles = filesToRewrite.map(f =>
getTouchedFile(deltaLog.dataPath, f, nameToAddFileMap))
val (removedBytes, removedPartitions) =
totalBytesAndDistinctPartitionValues(removedFiles)
numRemovedBytes = removedBytes
val (rewrittenBytes, rewrittenPartitions) =
totalBytesAndDistinctPartitionValues(rewrittenFiles)
numAddedBytes = rewrittenBytes
if (txn.metadata.partitionColumns.nonEmpty) {
numPartitionsRemovedFrom = Some(removedPartitions)
numPartitionsAddedTo = Some(rewrittenPartitions)
}
numAddedChangeFiles = changeFiles.size
changeFileBytes = changeFiles.collect { case f: AddCDCFile => f.size }.sum
rewriteTimeMs =
TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startTime) - scanTimeMs
numDeletedRows = Some(metrics("numDeletedRows").value)
numCopiedRows =
Some(metrics("numTouchedRows").value - metrics("numDeletedRows").value)
numDeletionVectorsRemoved = removedFiles.count(_.deletionVector != null)
val operationTimestamp = System.currentTimeMillis()
removeFilesFromPaths(
deltaLog, nameToAddFileMap, filesToRewrite, operationTimestamp) ++ rewrittenActions
}
}
}
}
metrics("numRemovedFiles").set(numRemovedFiles)
metrics("numAddedFiles").set(numAddedFiles)
val executionTimeMs = (System.nanoTime() - startTime) / 1000 / 1000
metrics("executionTimeMs").set(executionTimeMs)
metrics("scanTimeMs").set(scanTimeMs)
metrics("rewriteTimeMs").set(rewriteTimeMs)
metrics("numAddedChangeFiles").set(numAddedChangeFiles)
metrics("changeFileBytes").set(changeFileBytes)
metrics("numAddedBytes").set(numAddedBytes)
metrics("numRemovedBytes").set(numRemovedBytes)
metrics("numFilesBeforeSkipping").set(numFilesBeforeSkipping)
metrics("numBytesBeforeSkipping").set(numBytesBeforeSkipping)
metrics("numFilesAfterSkipping").set(numFilesAfterSkipping)
metrics("numBytesAfterSkipping").set(numBytesAfterSkipping)
metrics("numDeletionVectorsAdded").set(numDeletionVectorsAdded)
metrics("numDeletionVectorsRemoved").set(numDeletionVectorsRemoved)
metrics("numDeletionVectorsUpdated").set(numDeletionVectorsUpdated)
numPartitionsAfterSkipping.foreach(metrics("numPartitionsAfterSkipping").set)
numPartitionsAddedTo.foreach(metrics("numPartitionsAddedTo").set)
numPartitionsRemovedFrom.foreach(metrics("numPartitionsRemovedFrom").set)
numCopiedRows.foreach(metrics("numCopiedRows").set)
txn.registerSQLMetrics(sparkSession, metrics)
sendDriverMetrics(sparkSession, metrics)
recordDeltaEvent(
deltaLog,
"delta.dml.delete.stats",
data = DeleteMetric(
condition = condition.map(_.sql).getOrElse("true"),
numFilesTotal,
numFilesAfterSkipping,
numAddedFiles,
numRemovedFiles,
numAddedFiles,
numAddedChangeFiles = numAddedChangeFiles,
numFilesBeforeSkipping,
numBytesBeforeSkipping,
numFilesAfterSkipping,
numBytesAfterSkipping,
numPartitionsAfterSkipping,
numPartitionsAddedTo,
numPartitionsRemovedFrom,
numCopiedRows,
numDeletedRows,
numAddedBytes,
numRemovedBytes,
changeFileBytes = changeFileBytes,
scanTimeMs,
rewriteTimeMs,
numDeletionVectorsAdded,
numDeletionVectorsRemoved,
numDeletionVectorsUpdated)
)
if (deleteActions.nonEmpty) {
createSetTransaction(sparkSession, deltaLog).toSeq ++ deleteActions
} else {
Seq.empty
}
}