in backends-clickhouse/src-delta-20/main/scala/org/apache/spark/sql/delta/commands/UpdateCommand.scala [89:250]
private def performUpdate(
sparkSession: SparkSession,
deltaLog: DeltaLog,
txn: OptimisticTransaction): Unit = {
import sparkSession.implicits._
var numTouchedFiles: Long = 0
var numRewrittenFiles: Long = 0
var numAddedChangeFiles: Long = 0
var changeFileBytes: Long = 0
var scanTimeMs: Long = 0
var rewriteTimeMs: Long = 0
val startTime = System.nanoTime()
val numFilesTotal = deltaLog.snapshot.numOfFiles
val updateCondition = condition.getOrElse(Literal.TrueLiteral)
val (metadataPredicates, dataPredicates) =
DeltaTableUtils.splitMetadataAndDataPredicates(
updateCondition,
txn.metadata.partitionColumns,
sparkSession)
val candidateFiles = txn.filterFiles(metadataPredicates ++ dataPredicates)
val nameToAddFile = generateCandidateFileMap(deltaLog.dataPath, candidateFiles)
scanTimeMs = (System.nanoTime() - startTime) / 1000 / 1000
val filesToRewrite: Seq[AddFile] = if (candidateFiles.isEmpty) {
// Case 1: Do nothing if no row qualifies the partition predicates
// that are part of Update condition
Nil
} else if (dataPredicates.isEmpty) {
// Case 2: Update all the rows from the files that are in the specified partitions
// when the data filter is empty
candidateFiles
} else {
// Case 3: Find all the affected files using the user-specified condition
val fileIndex = new TahoeBatchFileIndex(
sparkSession,
"update",
candidateFiles,
deltaLog,
tahoeFileIndex.path,
txn.snapshot)
// Keep everything from the resolved target except a new TahoeFileIndex
// that only involves the affected files instead of all files.
val newTarget = DeltaTableUtils.replaceFileIndex(target, fileIndex)
val data = Dataset.ofRows(sparkSession, newTarget)
val updatedRowCount = metrics("numUpdatedRows")
val updatedRowUdf = udf {
() =>
updatedRowCount += 1
true
}.asNondeterministic()
val pathsToRewrite =
withStatusCode("DELTA", UpdateCommand.FINDING_TOUCHED_FILES_MSG) {
// --- modified start
data
.filter(new Column(updateCondition))
.filter(updatedRowUdf())
.select(input_file_name().as("input_files"))
.distinct()
.as[String]
.collect()
// --- modified end
}
scanTimeMs = (System.nanoTime() - startTime) / 1000 / 1000
pathsToRewrite.map(getTouchedFile(deltaLog.dataPath, _, nameToAddFile)).toSeq
}
numTouchedFiles = filesToRewrite.length
val newActions = if (filesToRewrite.isEmpty) {
// Do nothing if no row qualifies the UPDATE condition
Nil
} else {
// Generate the new files containing the updated values
withStatusCode("DELTA", UpdateCommand.rewritingFilesMsg(filesToRewrite.size)) {
rewriteFiles(
sparkSession,
txn,
tahoeFileIndex.path,
filesToRewrite.map(_.path),
nameToAddFile,
updateCondition)
}
}
rewriteTimeMs = (System.nanoTime() - startTime) / 1000 / 1000 - scanTimeMs
val (changeActions, addActions) = newActions.partition(_.isInstanceOf[AddCDCFile])
numRewrittenFiles = addActions.size
numAddedChangeFiles = changeActions.size
changeFileBytes = changeActions.collect { case f: AddCDCFile => f.size }.sum
val totalActions = if (filesToRewrite.isEmpty) {
// Do nothing if no row qualifies the UPDATE condition
Nil
} else {
// Delete the old files and return those delete actions along with the new AddFile actions for
// files containing the updated values
val operationTimestamp = System.currentTimeMillis()
val deleteActions = filesToRewrite.map(_.removeWithTimestamp(operationTimestamp))
deleteActions ++ newActions
}
if (totalActions.nonEmpty) {
metrics("numAddedFiles").set(numRewrittenFiles)
metrics("numAddedChangeFiles").set(numAddedChangeFiles)
metrics("changeFileBytes").set(changeFileBytes)
metrics("numRemovedFiles").set(numTouchedFiles)
metrics("executionTimeMs").set((System.nanoTime() - startTime) / 1000 / 1000)
metrics("scanTimeMs").set(scanTimeMs)
metrics("rewriteTimeMs").set(rewriteTimeMs)
// In the case where the numUpdatedRows is not captured, we can siphon out the metrics from
// the BasicWriteStatsTracker. This is for case 2 where the update condition contains only
// metadata predicates and so the entire partition is re-written.
val outputRows = txn.getMetric("numOutputRows").map(_.value).getOrElse(-1L)
if (
metrics("numUpdatedRows").value == 0 && outputRows != 0 &&
metrics("numCopiedRows").value == 0
) {
// We know that numTouchedRows = numCopiedRows + numUpdatedRows.
// Since an entire partition was re-written, no rows were copied.
// So numTouchedRows == numUpdateRows
metrics("numUpdatedRows").set(metrics("numTouchedRows").value)
} else {
// This is for case 3 where the update condition contains both metadata and data predicates
// so relevant files will have some rows updated and some rows copied. We don't need to
// consider case 1 here, where no files match the update condition, as we know that
// `totalActions` is empty.
metrics("numCopiedRows").set(
metrics("numTouchedRows").value - metrics("numUpdatedRows").value)
}
txn.registerSQLMetrics(sparkSession, metrics)
txn.commit(totalActions, DeltaOperations.Update(condition.map(_.toString)))
// This is needed to make the SQL metrics visible in the Spark UI
val executionId = sparkSession.sparkContext.getLocalProperty(SQLExecution.EXECUTION_ID_KEY)
SQLMetrics.postDriverMetricUpdates(
sparkSession.sparkContext,
executionId,
metrics.values.toSeq)
}
recordDeltaEvent(
deltaLog,
"delta.dml.update.stats",
data = UpdateMetric(
condition = condition.map(_.sql).getOrElse("true"),
numFilesTotal,
numTouchedFiles,
numRewrittenFiles,
numAddedChangeFiles,
changeFileBytes,
scanTimeMs,
rewriteTimeMs
)
)
}