in backends-clickhouse/src-delta-32/main/scala/org/apache/spark/sql/delta/commands/UpdateCommand.scala [115:329]
private def performUpdate(
sparkSession: SparkSession, deltaLog: DeltaLog, txn: OptimisticTransaction): Unit = {
import org.apache.spark.sql.delta.implicits._
var numTouchedFiles: Long = 0
var numRewrittenFiles: Long = 0
var numAddedBytes: Long = 0
var numRemovedBytes: Long = 0
var numAddedChangeFiles: Long = 0
var changeFileBytes: Long = 0
var scanTimeMs: Long = 0
var rewriteTimeMs: Long = 0
var numDeletionVectorsAdded: Long = 0
var numDeletionVectorsRemoved: Long = 0
var numDeletionVectorsUpdated: Long = 0
val startTime = System.nanoTime()
val numFilesTotal = txn.snapshot.numOfFiles
val updateCondition = condition.getOrElse(Literal.TrueLiteral)
val (metadataPredicates, dataPredicates) =
DeltaTableUtils.splitMetadataAndDataPredicates(
updateCondition, txn.metadata.partitionColumns, sparkSession)
// Should we write the DVs to represent updated rows?
val shouldWriteDeletionVectors = shouldWritePersistentDeletionVectors(sparkSession, txn)
val candidateFiles = txn.filterFiles(
metadataPredicates ++ dataPredicates,
keepNumRecords = shouldWriteDeletionVectors)
val nameToAddFile = generateCandidateFileMap(deltaLog.dataPath, candidateFiles)
scanTimeMs = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startTime)
val filesToRewrite: Seq[TouchedFileWithDV] = if (candidateFiles.isEmpty) {
// Case 1: Do nothing if no row qualifies the partition predicates
// that are part of Update condition
Nil
} else if (dataPredicates.isEmpty) {
// Case 2: Update all the rows from the files that are in the specified partitions
// when the data filter is empty
candidateFiles
.map(f => TouchedFileWithDV(f.path, f, newDeletionVector = null, deletedRows = 0L))
} else {
// Case 3: Find all the affected files using the user-specified condition
val fileIndex = new TahoeBatchFileIndex(
sparkSession, "update", candidateFiles, deltaLog, tahoeFileIndex.path, txn.snapshot)
val touchedFilesWithDV = if (shouldWriteDeletionVectors) {
// Case 3.1: Find all the affected files via DV path
val targetDf = DMLWithDeletionVectorsHelper.createTargetDfForScanningForMatches(
sparkSession,
target,
fileIndex)
// Does the target table already has DVs enabled? If so, we need to read the table
// with deletion vectors.
val mustReadDeletionVectors = DeletionVectorUtils.deletionVectorsReadable(txn.snapshot)
DMLWithDeletionVectorsHelper.findTouchedFiles(
sparkSession,
txn,
mustReadDeletionVectors,
deltaLog,
targetDf,
fileIndex,
updateCondition,
opName = "UPDATE")
} else {
// Case 3.2: Find all the affected files using the non-DV path
// Keep everything from the resolved target except a new TahoeFileIndex
// that only involves the affected files instead of all files.
val newTarget = DeltaTableUtils.replaceFileIndex(target, fileIndex)
val data = Dataset.ofRows(sparkSession, newTarget)
val incrUpdatedCountExpr = IncrementMetric(TrueLiteral, metrics("numUpdatedRows"))
val pathsToRewrite =
withStatusCode("DELTA", UpdateCommand.FINDING_TOUCHED_FILES_MSG) {
// --- modified start
data.filter(new Column(updateCondition))
.select(input_file_name().as("input_files"))
.filter(new Column(incrUpdatedCountExpr))
.distinct()
.as[String]
.collect()
// --- modified end
}
// Wrap AddFile into TouchedFileWithDV that has empty DV.
pathsToRewrite
.map(getTouchedFile(deltaLog.dataPath, _, nameToAddFile))
.map(f => TouchedFileWithDV(f.path, f, newDeletionVector = null, deletedRows = 0L))
.toSeq
}
// Refresh scan time for Case 3, since we performed scan here.
scanTimeMs = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startTime)
touchedFilesWithDV
}
val totalActions = {
// When DV is on, we first mask removed rows with DVs and generate (remove, add) pairs.
val actionsForExistingFiles = if (shouldWriteDeletionVectors) {
// When there's no data predicate, all matched files are removed.
if (dataPredicates.isEmpty) {
val operationTimestamp = System.currentTimeMillis()
filesToRewrite.map(_.fileLogEntry.removeWithTimestamp(operationTimestamp))
} else {
// When there is data predicate, we generate (remove, add) pairs.
val filesToRewriteWithDV = filesToRewrite.filter(_.newDeletionVector != null)
val (dvActions, metricMap) = DMLWithDeletionVectorsHelper.processUnmodifiedData(
sparkSession,
filesToRewriteWithDV,
txn.snapshot)
metrics("numUpdatedRows").set(metricMap("numModifiedRows"))
numDeletionVectorsAdded = metricMap("numDeletionVectorsAdded")
numDeletionVectorsRemoved = metricMap("numDeletionVectorsRemoved")
numDeletionVectorsUpdated = metricMap("numDeletionVectorsUpdated")
numTouchedFiles = metricMap("numRemovedFiles")
dvActions
}
} else {
// Without DV we'll leave the job to `rewriteFiles`.
Nil
}
// When DV is on, we write out updated rows only. The return value will be only `add` actions.
// When DV is off, we write out updated rows plus unmodified rows from the same file, then
// return `add` and `remove` actions.
val rewriteStartNs = System.nanoTime()
val actionsForNewFiles =
withStatusCode("DELTA", UpdateCommand.rewritingFilesMsg(filesToRewrite.size)) {
if (filesToRewrite.nonEmpty) {
rewriteFiles(
sparkSession,
txn,
rootPath = tahoeFileIndex.path,
inputLeafFiles = filesToRewrite.map(_.fileLogEntry),
nameToAddFileMap = nameToAddFile,
condition = updateCondition,
generateRemoveFileActions = !shouldWriteDeletionVectors,
copyUnmodifiedRows = !shouldWriteDeletionVectors)
} else {
Nil
}
}
rewriteTimeMs = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - rewriteStartNs)
numTouchedFiles = filesToRewrite.length
val (addActions, removeActions) = actionsForNewFiles.partition(_.isInstanceOf[AddFile])
numRewrittenFiles = addActions.size
numAddedBytes = addActions.map(_.getFileSize).sum
numRemovedBytes = removeActions.map(_.getFileSize).sum
actionsForExistingFiles ++ actionsForNewFiles
}
val changeActions = totalActions.collect { case f: AddCDCFile => f }
numAddedChangeFiles = changeActions.size
changeFileBytes = changeActions.map(_.size).sum
metrics("numAddedFiles").set(numRewrittenFiles)
metrics("numAddedBytes").set(numAddedBytes)
metrics("numAddedChangeFiles").set(numAddedChangeFiles)
metrics("changeFileBytes").set(changeFileBytes)
metrics("numRemovedFiles").set(numTouchedFiles)
metrics("numRemovedBytes").set(numRemovedBytes)
metrics("executionTimeMs").set(TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startTime))
metrics("scanTimeMs").set(scanTimeMs)
metrics("rewriteTimeMs").set(rewriteTimeMs)
// In the case where the numUpdatedRows is not captured, we can siphon out the metrics from
// the BasicWriteStatsTracker. This is for case 2 where the update condition contains only
// metadata predicates and so the entire partition is re-written.
val outputRows = txn.getMetric("numOutputRows").map(_.value).getOrElse(-1L)
if (metrics("numUpdatedRows").value == 0 && outputRows != 0 &&
metrics("numCopiedRows").value == 0) {
// We know that numTouchedRows = numCopiedRows + numUpdatedRows.
// Since an entire partition was re-written, no rows were copied.
// So numTouchedRows == numUpdateRows
metrics("numUpdatedRows").set(metrics("numTouchedRows").value)
} else {
// This is for case 3 where the update condition contains both metadata and data predicates
// so relevant files will have some rows updated and some rows copied. We don't need to
// consider case 1 here, where no files match the update condition, as we know that
// `totalActions` is empty.
metrics("numCopiedRows").set(
metrics("numTouchedRows").value - metrics("numUpdatedRows").value)
metrics("numDeletionVectorsAdded").set(numDeletionVectorsAdded)
metrics("numDeletionVectorsRemoved").set(numDeletionVectorsRemoved)
metrics("numDeletionVectorsUpdated").set(numDeletionVectorsUpdated)
}
txn.registerSQLMetrics(sparkSession, metrics)
val finalActions = createSetTransaction(sparkSession, deltaLog).toSeq ++ totalActions
txn.commitIfNeeded(
actions = finalActions,
op = DeltaOperations.Update(condition),
tags = RowTracking.addPreservedRowTrackingTagIfNotSet(txn.snapshot))
sendDriverMetrics(sparkSession, metrics)
recordDeltaEvent(
deltaLog,
"delta.dml.update.stats",
data = UpdateMetric(
condition = condition.map(_.sql).getOrElse("true"),
numFilesTotal,
numTouchedFiles,
numRewrittenFiles,
numAddedChangeFiles,
changeFileBytes,
scanTimeMs,
rewriteTimeMs,
numDeletionVectorsAdded,
numDeletionVectorsRemoved,
numDeletionVectorsUpdated)
)
}