in backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/datasources/v1/clickhouse/MergeTreeDeltaTxnWriter.scala [218:354]
def writeFiles(
txn: OptimisticTransaction,
inputData: Dataset[_],
deltaOptions: Option[DeltaOptions],
writeOptions: Map[String, String],
database: String,
tableName: String,
orderByKeyOption: Option[Seq[String]],
primaryKeyOption: Option[Seq[String]],
clickhouseTableConfigs: Map[String, String],
partitionColumns: Seq[String],
bucketSpec: Option[BucketSpec],
additionalConstraints: Seq[Constraint]): Seq[FileAction] = {
// use reflect to set the protected field: hasWritten
setOptimisticTransactionHasWritten(txn)
val deltaLog = txn.deltaLog
val metadata = txn.metadata
val spark = inputData.sparkSession
val (data, partitionSchema) = performCDCPartition(txn, inputData)
val outputPath = deltaLog.dataPath
val (queryExecution, output, generatedColumnConstraints, _) =
normalizeData(txn, metadata, deltaLog, data)
val partitioningColumns = getPartitioningColumns(partitionSchema, output)
val committer = new MergeTreeCommitProtocol("delta-mergetree", outputPath.toString, None)
// If Statistics Collection is enabled, then create a stats tracker that will be injected during
// the FileFormatWriter.write call below and will collect per-file stats using
// StatisticsCollection
// val (optionalStatsTracker, _) = getOptionalStatsTrackerAndStatsCollection(output, outputPath,
// partitionSchema, data)
val (optionalStatsTracker, _) = (None, None)
val constraints =
Constraints.getAll(metadata, spark) ++ generatedColumnConstraints ++ additionalConstraints
SQLExecution.withNewExecutionId(queryExecution, Option("deltaTransactionalWrite")) {
val outputSpec = FileFormatWriter.OutputSpec(outputPath.toString, Map.empty, output)
val queryPlan = queryExecution.executedPlan
val newQueryPlan = queryPlan match {
// if the child is columnar, we can just wrap&transfer the columnar data
case c2r: ColumnarToRowExecBase =>
FakeRowAdaptor(c2r.child)
// If the child is aqe, we make aqe "support columnar",
// then aqe itself will guarantee to generate columnar outputs.
// So FakeRowAdaptor will always consumes columnar data,
// thus avoiding the case of c2r->aqe->r2c->writer
case aqe: AdaptiveSparkPlanExec =>
FakeRowAdaptor(
AdaptiveSparkPlanExec(
aqe.inputPlan,
aqe.context,
aqe.preprocessingRules,
aqe.isSubquery,
supportsColumnar = true
))
case other => queryPlan.withNewChildren(Array(FakeRowAdaptor(other)))
}
val statsTrackers: ListBuffer[WriteJobStatsTracker] = ListBuffer()
if (spark.conf.get(DeltaSQLConf.DELTA_HISTORY_METRICS_ENABLED)) {
val basicWriteJobStatsTracker = new BasicWriteJobStatsTracker(
new SerializableConfiguration(deltaLog.newDeltaHadoopConf()),
BasicWriteJobStatsTracker.metrics)
// registerSQLMetrics(spark, basicWriteJobStatsTracker.driverSideMetrics)
statsTrackers.append(basicWriteJobStatsTracker)
}
// Retain only a minimal selection of Spark writer options to avoid any potential
// compatibility issues
val options = writeOptions.filterKeys {
key =>
key.equalsIgnoreCase(DeltaOptions.MAX_RECORDS_PER_FILE) ||
key.equalsIgnoreCase(DeltaOptions.COMPRESSION)
}.toMap
try {
MergeTreeFileFormatWriter.write(
sparkSession = spark,
plan = newQueryPlan,
fileFormat = new DeltaMergeTreeFileFormat(
metadata,
database,
tableName,
output,
orderByKeyOption,
primaryKeyOption,
clickhouseTableConfigs,
partitionColumns),
// formats.
committer = committer,
outputSpec = outputSpec,
// scalastyle:off deltahadoopconfiguration
hadoopConf =
spark.sessionState.newHadoopConfWithOptions(metadata.configuration ++ deltaLog.options),
// scalastyle:on deltahadoopconfiguration
orderByKeyOption = orderByKeyOption,
primaryKeyOption = primaryKeyOption,
partitionColumns = partitioningColumns,
bucketSpec = bucketSpec,
statsTrackers = optionalStatsTracker.toSeq ++ statsTrackers,
options = options,
constraints = constraints
)
} catch {
case s: SparkException =>
// Pull an InvariantViolationException up to the top level if it was the root cause.
val violationException = ExceptionUtils.getRootCause(s)
if (violationException.isInstanceOf[InvariantViolationException]) {
throw violationException
} else {
throw s
}
}
}
// val resultFiles = committer.addedStatuses.map { a =>
// a.copy(stats = optionalStatsTracker.map(
// _.recordedStats(new Path(new URI(a.path)).getName)).getOrElse(a.stats))
/* val resultFiles = committer.addedStatuses.filter {
// In some cases, we can write out an empty `inputData`. Some examples of this (though, they
// may be fixed in the future) are the MERGE command when you delete with empty source, or
// empty target, or on disjoint tables. This is hard to catch before the write without
// collecting the DF ahead of time. Instead, we can return only the AddFiles that
// a) actually add rows, or
// b) don't have any stats so we don't know the number of rows at all
case a: AddFile => a.numLogicalRecords.forall(_ > 0)
case _ => true
} */
committer.addedStatuses.toSeq ++ committer.changeFiles
}