def writeFiles()

in backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/datasources/v1/clickhouse/MergeTreeDeltaTxnWriter.scala [218:354]


  def writeFiles(
      txn: OptimisticTransaction,
      inputData: Dataset[_],
      deltaOptions: Option[DeltaOptions],
      writeOptions: Map[String, String],
      database: String,
      tableName: String,
      orderByKeyOption: Option[Seq[String]],
      primaryKeyOption: Option[Seq[String]],
      clickhouseTableConfigs: Map[String, String],
      partitionColumns: Seq[String],
      bucketSpec: Option[BucketSpec],
      additionalConstraints: Seq[Constraint]): Seq[FileAction] = {
    // use reflect to set the protected field: hasWritten
    setOptimisticTransactionHasWritten(txn)

    val deltaLog = txn.deltaLog
    val metadata = txn.metadata

    val spark = inputData.sparkSession
    val (data, partitionSchema) = performCDCPartition(txn, inputData)
    val outputPath = deltaLog.dataPath

    val (queryExecution, output, generatedColumnConstraints, _) =
      normalizeData(txn, metadata, deltaLog, data)
    val partitioningColumns = getPartitioningColumns(partitionSchema, output)

    val committer = new MergeTreeCommitProtocol("delta-mergetree", outputPath.toString, None)

    // If Statistics Collection is enabled, then create a stats tracker that will be injected during
    // the FileFormatWriter.write call below and will collect per-file stats using
    // StatisticsCollection
    // val (optionalStatsTracker, _) = getOptionalStatsTrackerAndStatsCollection(output, outputPath,
    //   partitionSchema, data)
    val (optionalStatsTracker, _) = (None, None)

    val constraints =
      Constraints.getAll(metadata, spark) ++ generatedColumnConstraints ++ additionalConstraints

    SQLExecution.withNewExecutionId(queryExecution, Option("deltaTransactionalWrite")) {
      val outputSpec = FileFormatWriter.OutputSpec(outputPath.toString, Map.empty, output)

      val queryPlan = queryExecution.executedPlan
      val newQueryPlan = queryPlan match {
        // if the child is columnar, we can just wrap&transfer the columnar data
        case c2r: ColumnarToRowExecBase =>
          FakeRowAdaptor(c2r.child)
        // If the child is aqe, we make aqe "support columnar",
        // then aqe itself will guarantee to generate columnar outputs.
        // So FakeRowAdaptor will always consumes columnar data,
        // thus avoiding the case of c2r->aqe->r2c->writer
        case aqe: AdaptiveSparkPlanExec =>
          FakeRowAdaptor(
            AdaptiveSparkPlanExec(
              aqe.inputPlan,
              aqe.context,
              aqe.preprocessingRules,
              aqe.isSubquery,
              supportsColumnar = true
            ))
        case other => queryPlan.withNewChildren(Array(FakeRowAdaptor(other)))
      }

      val statsTrackers: ListBuffer[WriteJobStatsTracker] = ListBuffer()

      if (spark.conf.get(DeltaSQLConf.DELTA_HISTORY_METRICS_ENABLED)) {
        val basicWriteJobStatsTracker = new BasicWriteJobStatsTracker(
          new SerializableConfiguration(deltaLog.newDeltaHadoopConf()),
          BasicWriteJobStatsTracker.metrics)
        // registerSQLMetrics(spark, basicWriteJobStatsTracker.driverSideMetrics)
        statsTrackers.append(basicWriteJobStatsTracker)
      }

      // Retain only a minimal selection of Spark writer options to avoid any potential
      // compatibility issues
      val options = writeOptions.filterKeys {
        key =>
          key.equalsIgnoreCase(DeltaOptions.MAX_RECORDS_PER_FILE) ||
          key.equalsIgnoreCase(DeltaOptions.COMPRESSION)
      }.toMap

      try {
        MergeTreeFileFormatWriter.write(
          sparkSession = spark,
          plan = newQueryPlan,
          fileFormat = new DeltaMergeTreeFileFormat(
            metadata,
            database,
            tableName,
            output,
            orderByKeyOption,
            primaryKeyOption,
            clickhouseTableConfigs,
            partitionColumns),
          // formats.
          committer = committer,
          outputSpec = outputSpec,
          // scalastyle:off deltahadoopconfiguration
          hadoopConf =
            spark.sessionState.newHadoopConfWithOptions(metadata.configuration ++ deltaLog.options),
          // scalastyle:on deltahadoopconfiguration
          orderByKeyOption = orderByKeyOption,
          primaryKeyOption = primaryKeyOption,
          partitionColumns = partitioningColumns,
          bucketSpec = bucketSpec,
          statsTrackers = optionalStatsTracker.toSeq ++ statsTrackers,
          options = options,
          constraints = constraints
        )
      } catch {
        case s: SparkException =>
          // Pull an InvariantViolationException up to the top level if it was the root cause.
          val violationException = ExceptionUtils.getRootCause(s)
          if (violationException.isInstanceOf[InvariantViolationException]) {
            throw violationException
          } else {
            throw s
          }
      }
    }

    // val resultFiles = committer.addedStatuses.map { a =>
    //   a.copy(stats = optionalStatsTracker.map(
    //    _.recordedStats(new Path(new URI(a.path)).getName)).getOrElse(a.stats))
    /* val resultFiles = committer.addedStatuses.filter {
      // In some cases, we can write out an empty `inputData`. Some examples of this (though, they
      // may be fixed in the future) are the MERGE command when you delete with empty source, or
      // empty target, or on disjoint tables. This is hard to catch before the write without
      // collecting the DF ahead of time. Instead, we can return only the AddFiles that
      // a) actually add rows, or
      // b) don't have any stats so we don't know the number of rows at all
      case a: AddFile => a.numLogicalRecords.forall(_ > 0)
      case _ => true
    } */

    committer.addedStatuses.toSeq ++ committer.changeFiles
  }