def write()

in backends-clickhouse/src/main/delta-20/org/apache/spark/sql/execution/datasources/v1/clickhouse/commands/WriteMergeTreeToDelta.scala [107:300]


  def write(txn: OptimisticTransaction, sparkSession: SparkSession): Seq[Action] = {
    import sparkSession.implicits._
    if (txn.readVersion > -1) {
      // This table already exists, check if the insert is valid.
      if (mode == SaveMode.ErrorIfExists) {
        throw DeltaErrors.pathAlreadyExistsException(deltaLog.dataPath)
      } else if (mode == SaveMode.Ignore) {
        return Nil
      } else if (mode == SaveMode.Overwrite) {
        deltaLog.assertRemovable()
      }
    }
    val rearrangeOnly = options.rearrangeOnly
    // Delta does not support char padding and we should only have varchar type. This does not
    // change the actual behavior, but makes DESC TABLE to show varchar instead of char.
    val dataSchema = CharVarcharUtils.replaceCharVarcharWithStringInSchema(
      replaceCharWithVarchar(CharVarcharUtils.getRawSchema(data.schema)).asInstanceOf[StructType])
    var finalSchema = schemaInCatalog.getOrElse(dataSchema)
    updateMetadata(
      data.sparkSession,
      txn,
      finalSchema,
      partitionColumns,
      configuration,
      isOverwriteOperation,
      rearrangeOnly)

    val replaceOnDataColsEnabled =
      sparkSession.conf.get(DeltaSQLConf.REPLACEWHERE_DATACOLUMNS_ENABLED)

    val useDynamicPartitionOverwriteMode = {
      if (txn.metadata.partitionColumns.isEmpty) {
        // We ignore dynamic partition overwrite mode for non-partitioned tables
        false
      } else if (options.replaceWhere.nonEmpty) {
        if (options.partitionOverwriteModeInOptions && options.isDynamicPartitionOverwriteMode) {
          // replaceWhere and dynamic partition overwrite conflict because they both specify which
          // data to overwrite. We throw an error when:
          // 1. replaceWhere is provided in a DataFrameWriter option
          // 2. partitionOverwriteMode is set to "dynamic" in a DataFrameWriter option
          throw DeltaErrors.replaceWhereUsedWithDynamicPartitionOverwrite()
        } else {
          // If replaceWhere is provided, we do not use dynamic partition overwrite, even if it's
          // enabled in the spark session configuration, since generally query-specific configs take
          // precedence over session configs
          false
        }
      } else options.isDynamicPartitionOverwriteMode
    }

    // Validate partition predicates
    var containsDataFilters = false
    val replaceWhere = options.replaceWhere.flatMap {
      replace =>
        val parsed = parsePredicates(sparkSession, replace)
        if (replaceOnDataColsEnabled) {
          // Helps split the predicate into separate expressions
          val (metadataPredicates, dataFilters) = DeltaTableUtils.splitMetadataAndDataPredicates(
            parsed.head,
            txn.metadata.partitionColumns,
            sparkSession)
          if (rearrangeOnly && dataFilters.nonEmpty) {
            throw DeltaErrors.replaceWhereWithFilterDataChangeUnset(dataFilters.mkString(","))
          }
          containsDataFilters = dataFilters.nonEmpty
          Some(metadataPredicates ++ dataFilters)
        } else if (mode == SaveMode.Overwrite) {
          verifyPartitionPredicates(sparkSession, txn.metadata.partitionColumns, parsed)
          Some(parsed)
        } else {
          None
        }
    }

    if (txn.readVersion < 0) {
      // Initialize the log path
      deltaLog.createLogDirectory()
    }

    val (newFiles, addFiles, deletedFiles) = (mode, replaceWhere) match {
      case (SaveMode.Overwrite, Some(predicates)) if !replaceOnDataColsEnabled =>
        // fall back to match on partition cols only when replaceArbitrary is disabled.
        val newFiles = txn.writeFiles(data, Some(options))
        val addFiles = newFiles.collect { case a: AddFile => a }
        // Check to make sure the files we wrote out were actually valid.
        val matchingFiles = DeltaLog
          .filterFileList(txn.metadata.partitionSchema, addFiles.toDF(), predicates)
          .as[AddFile]
          .collect()
        val invalidFiles = addFiles.toSet -- matchingFiles
        if (invalidFiles.nonEmpty) {
          val badPartitions = invalidFiles
            .map(_.partitionValues)
            .map {
              _.map { case (k, v) => s"$k=$v" }.mkString("/")
            }
            .mkString(", ")
          throw DeltaErrors.replaceWhereMismatchException(options.replaceWhere.get, badPartitions)
        }
        (newFiles, addFiles, txn.filterFiles(predicates).map(_.remove))
      case (SaveMode.Overwrite, Some(condition)) if txn.snapshot.version >= 0 =>
        val constraints = extractConstraints(sparkSession, condition)

        val removedFileActions = removeFiles(sparkSession, txn, condition)
        val cdcExistsInRemoveOp = removedFileActions.exists(_.isInstanceOf[AddCDCFile])

        // The above REMOVE will not produce explicit CDF data when persistent DV is enabled.
        // Therefore here we need to decide whether to produce explicit CDF for INSERTs, because
        // the CDF protocol requires either (i) all CDF data are generated explicitly as AddCDCFile,
        // or (ii) all CDF data can be deduced from [[AddFile]] and [[RemoveFile]].
        val dataToWrite =
          if (
            containsDataFilters && CDCReader.isCDCEnabledOnTable(txn.metadata) &&
            sparkSession.conf.get(DeltaSQLConf.REPLACEWHERE_DATACOLUMNS_WITH_CDF_ENABLED) &&
            cdcExistsInRemoveOp
          ) {
            var dataWithDefaultExprs = data

            // pack new data and cdc data into an array of structs and unpack them into rows
            // to share values in outputCols on both branches, avoiding re-evaluating
            // non-deterministic expression twice.
            val outputCols = dataWithDefaultExprs.schema.map(SchemaUtils.fieldToColumn(_))
            val insertCols = outputCols :+
              lit(CDCReader.CDC_TYPE_INSERT).as(CDCReader.CDC_TYPE_COLUMN_NAME)
            val insertDataCols = outputCols :+
              new Column(CDCReader.CDC_TYPE_NOT_CDC)
                .as(CDCReader.CDC_TYPE_COLUMN_NAME)
            val packedInserts = array(
              struct(insertCols: _*),
              struct(insertDataCols: _*)
            ).expr

            dataWithDefaultExprs
              .select(explode(new Column(packedInserts)).as("packedData"))
              .select((dataWithDefaultExprs.schema.map(_.name) :+ CDCReader.CDC_TYPE_COLUMN_NAME)
                .map(n => col(s"packedData.`$n`").as(n)): _*)
          } else {
            data
          }
        val newFiles =
          try txn.writeFiles(dataToWrite, Some(options), constraints)
          catch {
            case e: InvariantViolationException =>
              throw DeltaErrors.replaceWhereMismatchException(options.replaceWhere.get, e)
          }
        (newFiles, newFiles.collect { case a: AddFile => a }, removedFileActions)
      case (SaveMode.Overwrite, None) =>
        val newFiles = txn.writeFiles(data, Some(options))
        val addFiles = newFiles.collect { case a: AddFile => a }
        val deletedFiles = if (useDynamicPartitionOverwriteMode) {
          // with dynamic partition overwrite for any partition that is being written to all
          // existing data in that partition will be deleted.
          // the selection what to delete is on the next two lines
          val updatePartitions = addFiles.map(_.partitionValues).toSet
          txn.filterFiles(updatePartitions).map(_.remove)
        } else {
          txn.filterFiles().map(_.remove)
        }
        (newFiles, addFiles, deletedFiles)
      case _ =>
        val newFiles = MergeTreeDeltaTxnWriter
          .writeFiles(
            txn,
            data,
            Some(options),
            writeOptions,
            database,
            tableName,
            orderByKeyOption,
            primaryKeyOption,
            clickhouseTableConfigs,
            partitionColumns,
            bucketSpec,
            Seq.empty)
        (newFiles, newFiles.collect { case a: AddFile => a }, Nil)
    }

    val fileActions = if (rearrangeOnly) {
      val changeFiles = newFiles.collect { case c: AddCDCFile => c }
      if (changeFiles.nonEmpty) {
        throw DeltaErrors.unexpectedChangeFilesFound(changeFiles.mkString("\n"))
      }
      addFiles.map(_.copy(dataChange = !rearrangeOnly)) ++
        deletedFiles.map {
          case add: AddFile => add.copy(dataChange = !rearrangeOnly)
          case remove: RemoveFile => remove.copy(dataChange = !rearrangeOnly)
          case other => throw DeltaErrors.illegalFilesFound(other.toString)
        }
    } else {
      newFiles ++ deletedFiles
    }
    var setTxns = createSetTransaction()
    setTxns.toSeq ++ fileActions
  }