in backends-clickhouse/src/main/delta-22/org/apache/spark/sql/execution/datasources/v1/clickhouse/commands/WriteMergeTreeToDelta.scala [178:379]
def write(txn: OptimisticTransaction, sparkSession: SparkSession): Seq[Action] = {
import org.apache.spark.sql.delta.implicits._
if (txn.readVersion > -1) {
// This table already exists, check if the insert is valid.
if (mode == SaveMode.ErrorIfExists) {
throw DeltaErrors.pathAlreadyExistsException(deltaLog.dataPath)
} else if (mode == SaveMode.Ignore) {
return Nil
} else if (mode == SaveMode.Overwrite) {
deltaLog.assertRemovable()
}
}
val rearrangeOnly = options.rearrangeOnly
// Delta does not support char padding and we should only have varchar type. This does not
// change the actual behavior, but makes DESC TABLE to show varchar instead of char.
val dataSchema = CharVarcharUtils.replaceCharVarcharWithStringInSchema(
replaceCharWithVarchar(CharVarcharUtils.getRawSchema(data.schema)).asInstanceOf[StructType])
var finalSchema = schemaInCatalog.getOrElse(dataSchema)
updateMetadata(
data.sparkSession,
txn,
finalSchema,
partitionColumns,
configuration,
isOverwriteOperation,
rearrangeOnly)
val replaceOnDataColsEnabled =
sparkSession.conf.get(DeltaSQLConf.REPLACEWHERE_DATACOLUMNS_ENABLED)
val useDynamicPartitionOverwriteMode = {
if (txn.metadata.partitionColumns.isEmpty) {
// We ignore dynamic partition overwrite mode for non-partitioned tables
false
} else if (options.replaceWhere.nonEmpty) {
if (options.partitionOverwriteModeInOptions && options.isDynamicPartitionOverwriteMode) {
// replaceWhere and dynamic partition overwrite conflict because they both specify which
// data to overwrite. We throw an error when:
// 1. replaceWhere is provided in a DataFrameWriter option
// 2. partitionOverwriteMode is set to "dynamic" in a DataFrameWriter option
throw DeltaErrors.replaceWhereUsedWithDynamicPartitionOverwrite()
} else {
// If replaceWhere is provided, we do not use dynamic partition overwrite, even if it's
// enabled in the spark session configuration, since generally query-specific configs take
// precedence over session configs
false
}
} else options.isDynamicPartitionOverwriteMode
}
// Validate partition predicates
var containsDataFilters = false
val replaceWhere = options.replaceWhere.flatMap {
replace =>
val parsed = parsePredicates(sparkSession, replace)
if (replaceOnDataColsEnabled) {
// Helps split the predicate into separate expressions
val (metadataPredicates, dataFilters) = DeltaTableUtils.splitMetadataAndDataPredicates(
parsed.head,
txn.metadata.partitionColumns,
sparkSession)
if (rearrangeOnly && dataFilters.nonEmpty) {
throw DeltaErrors.replaceWhereWithFilterDataChangeUnset(dataFilters.mkString(","))
}
containsDataFilters = dataFilters.nonEmpty
Some(metadataPredicates ++ dataFilters)
} else if (mode == SaveMode.Overwrite) {
verifyPartitionPredicates(sparkSession, txn.metadata.partitionColumns, parsed)
Some(parsed)
} else {
None
}
}
if (txn.readVersion < 0) {
// Initialize the log path
deltaLog.createLogDirectory()
}
val (newFiles, addFiles, deletedFiles) = (mode, replaceWhere) match {
case (SaveMode.Overwrite, Some(predicates)) if !replaceOnDataColsEnabled =>
// fall back to match on partition cols only when replaceArbitrary is disabled.
val newFiles = txn.writeFiles(data, Some(options))
val addFiles = newFiles.collect { case a: AddFile => a }
// Check to make sure the files we wrote out were actually valid.
val matchingFiles = DeltaLog
.filterFileList(txn.metadata.partitionSchema, addFiles.toDF(sparkSession), predicates)
.as[AddFile]
.collect()
val invalidFiles = addFiles.toSet -- matchingFiles
if (invalidFiles.nonEmpty) {
val badPartitions = invalidFiles
.map(_.partitionValues)
.map {
_.map { case (k, v) => s"$k=$v" }.mkString("/")
}
.mkString(", ")
throw DeltaErrors.replaceWhereMismatchException(options.replaceWhere.get, badPartitions)
}
(newFiles, addFiles, txn.filterFiles(predicates).map(_.remove))
case (SaveMode.Overwrite, Some(condition)) if txn.snapshot.version >= 0 =>
val constraints = extractConstraints(sparkSession, condition)
val removedFileActions = removeFiles(sparkSession, txn, condition)
val cdcExistsInRemoveOp = removedFileActions.exists(_.isInstanceOf[AddCDCFile])
// The above REMOVE will not produce explicit CDF data when persistent DV is enabled.
// Therefore here we need to decide whether to produce explicit CDF for INSERTs, because
// the CDF protocol requires either (i) all CDF data are generated explicitly as AddCDCFile,
// or (ii) all CDF data can be deduced from [[AddFile]] and [[RemoveFile]].
val dataToWrite =
if (
containsDataFilters && CDCReader.isCDCEnabledOnTable(txn.metadata) &&
sparkSession.conf.get(DeltaSQLConf.REPLACEWHERE_DATACOLUMNS_WITH_CDF_ENABLED) &&
cdcExistsInRemoveOp
) {
var dataWithDefaultExprs = data
// pack new data and cdc data into an array of structs and unpack them into rows
// to share values in outputCols on both branches, avoiding re-evaluating
// non-deterministic expression twice.
val outputCols = dataWithDefaultExprs.schema.map(SchemaUtils.fieldToColumn(_))
val insertCols = outputCols :+
lit(CDCReader.CDC_TYPE_INSERT).as(CDCReader.CDC_TYPE_COLUMN_NAME)
val insertDataCols = outputCols :+
new Column(CDCReader.CDC_TYPE_NOT_CDC)
.as(CDCReader.CDC_TYPE_COLUMN_NAME)
val packedInserts = array(
struct(insertCols: _*),
struct(insertDataCols: _*)
).expr
dataWithDefaultExprs
.select(explode(new Column(packedInserts)).as("packedData"))
.select((dataWithDefaultExprs.schema.map(_.name) :+ CDCReader.CDC_TYPE_COLUMN_NAME)
.map(n => col(s"packedData.`$n`").as(n)): _*)
} else {
data
}
val newFiles =
try txn.writeFiles(dataToWrite, Some(options), constraints)
catch {
case e: InvariantViolationException =>
throw DeltaErrors.replaceWhereMismatchException(options.replaceWhere.get, e)
}
(newFiles, newFiles.collect { case a: AddFile => a }, removedFileActions)
case (SaveMode.Overwrite, None) =>
val newFiles = txn.writeFiles(data, Some(options))
val addFiles = newFiles.collect { case a: AddFile => a }
val deletedFiles = if (useDynamicPartitionOverwriteMode) {
// with dynamic partition overwrite for any partition that is being written to all
// existing data in that partition will be deleted.
// the selection what to delete is on the next two lines
val updatePartitions = addFiles.map(_.partitionValues).toSet
txn.filterFiles(updatePartitions).map(_.remove)
} else {
txn.filterFiles().map(_.remove)
}
(newFiles, addFiles, deletedFiles)
case _ =>
val newFiles = MergeTreeDeltaTxnWriter
.writeFiles(
txn,
data,
Some(options),
writeOptions,
database,
tableName,
orderByKeyOption,
primaryKeyOption,
clickhouseTableConfigs,
partitionColumns,
bucketSpec,
Seq.empty)
(newFiles, newFiles.collect { case a: AddFile => a }, Nil)
}
// Need to handle replace where metrics separately.
if (
replaceWhere.nonEmpty && replaceOnDataColsEnabled &&
sparkSession.conf.get(DeltaSQLConf.REPLACEWHERE_METRICS_ENABLED)
) {
registerReplaceWhereMetrics(sparkSession, txn, newFiles, deletedFiles)
}
val fileActions = if (rearrangeOnly) {
val changeFiles = newFiles.collect { case c: AddCDCFile => c }
if (changeFiles.nonEmpty) {
throw DeltaErrors.unexpectedChangeFilesFound(changeFiles.mkString("\n"))
}
addFiles.map(_.copy(dataChange = !rearrangeOnly)) ++
deletedFiles.map {
case add: AddFile => add.copy(dataChange = !rearrangeOnly)
case remove: RemoveFile => remove.copy(dataChange = !rearrangeOnly)
case other => throw DeltaErrors.illegalFilesFound(other.toString)
}
} else {
newFiles ++ deletedFiles
}
var setTxns = createSetTransaction()
setTxns.toSeq ++ fileActions
}