private def writeInternal()

in hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala [214:549]


  private def writeInternal(sqlContext: SQLContext,
                            mode: SaveMode,
                            optParams: Map[String, String],
                            sourceDf: DataFrame,
                            streamingWritesParamsOpt: Option[StreamingWriteParams] = Option.empty,
                            hoodieWriteClient: Option[SparkRDDWriteClient[_]] = Option.empty):
  (Boolean, HOption[String], HOption[String], HOption[String], SparkRDDWriteClient[_], HoodieTableConfig) = {

    assert(optParams.get("path").exists(!StringUtils.isNullOrEmpty(_)), "'path' must be set")
    val path = optParams("path")
    val basePath = new Path(path)

    val spark = sqlContext.sparkSession
    val sparkContext = sqlContext.sparkContext

    val fs = basePath.getFileSystem(sparkContext.hadoopConfiguration)
    tableExists = fs.exists(new Path(basePath, HoodieTableMetaClient.METAFOLDER_NAME))
    var tableConfig = getHoodieTableConfig(sparkContext, path, mode, streamingWritesParamsOpt.map(_.hoodieTableConfigOpt).orElse(Option.apply(Option.empty)).get)
    // get params w/o injecting default and validate
    val paramsWithoutDefaults = HoodieWriterUtils.getParamsWithAlternatives(optParams)
    val originKeyGeneratorClassName = HoodieWriterUtils.getOriginKeyGenerator(paramsWithoutDefaults)
    val timestampKeyGeneratorConfigs = extractConfigsRelatedToTimestampBasedKeyGenerator(
      originKeyGeneratorClassName, paramsWithoutDefaults)

    // Validate datasource and tableconfig keygen are the same
    validateKeyGeneratorConfig(originKeyGeneratorClassName, tableConfig)
    validateTableConfig(sqlContext.sparkSession, optParams, tableConfig, mode == SaveMode.Overwrite)

    asyncCompactionTriggerFnDefined = streamingWritesParamsOpt.map(_.asyncCompactionTriggerFn.isDefined).orElse(Some(false)).get
    asyncClusteringTriggerFnDefined = streamingWritesParamsOpt.map(_.asyncClusteringTriggerFn.isDefined).orElse(Some(false)).get
    // re-use table configs and inject defaults.
    var (parameters, hoodieConfig) = mergeParamsAndGetHoodieConfig(optParams, tableConfig, mode, streamingWritesParamsOpt.isDefined)
    val databaseName = hoodieConfig.getStringOrDefault(HoodieTableConfig.DATABASE_NAME, "")
    val tblName = hoodieConfig.getStringOrThrow(HoodieWriteConfig.TBL_NAME,
      s"'${HoodieWriteConfig.TBL_NAME.key}' must be set.").trim
    val tableIdentifier = TableIdentifier(tblName, if (databaseName.isEmpty) None else Some(databaseName))

    assert(!StringUtils.isNullOrEmpty(hoodieConfig.getString(HoodieWriteConfig.TBL_NAME)),
      s"'${HoodieWriteConfig.TBL_NAME.key}' must be set.")

    sparkContext.getConf.getOption("spark.serializer") match {
      case Some(ser) if ser.equals("org.apache.spark.serializer.KryoSerializer") =>
      case _ => throw new HoodieException("hoodie only support org.apache.spark.serializer.KryoSerializer as spark.serializer")
    }
    val tableType = HoodieTableType.valueOf(hoodieConfig.getString(TABLE_TYPE))
    val operation = deduceOperation(hoodieConfig, paramsWithoutDefaults, sourceDf)

    val preppedSparkSqlMergeInto = parameters.getOrElse(SPARK_SQL_MERGE_INTO_PREPPED_KEY, "false").toBoolean
    val preppedSparkSqlWrites = parameters.getOrElse(SPARK_SQL_WRITES_PREPPED_KEY, "false").toBoolean
    val preppedWriteOperation = canDoPreppedWrites(hoodieConfig, parameters, operation, sourceDf)

    val jsc = new JavaSparkContext(sparkContext)
    if (streamingWritesParamsOpt.map(_.asyncCompactionTriggerFn.isDefined).orElse(Some(false)).get) {
      if (jsc.getConf.getOption(SparkConfigs.SPARK_SCHEDULER_ALLOCATION_FILE_KEY).isDefined) {
        jsc.setLocalProperty("spark.scheduler.pool", SparkConfigs.SPARK_DATASOURCE_WRITER_POOL_NAME)
      }
    }

    val keyGenerator = HoodieSparkKeyGeneratorFactory.createKeyGenerator(TypedProperties.copy(hoodieConfig.getProps))
    val tableVersion = Integer.valueOf(getStringWithAltKeys(parameters, HoodieWriteConfig.WRITE_TABLE_VERSION))
    if (mode == SaveMode.Ignore && tableExists) {
      log.warn(s"hoodie table at $basePath already exists. Ignoring & not performing actual writes.")
      (false, common.util.Option.empty(), common.util.Option.empty(), common.util.Option.empty(), hoodieWriteClient.orNull, tableConfig)
    } else {
      // Handle various save modes
      handleSaveModes(sqlContext.sparkSession, mode, basePath, tableConfig, tblName, operation, fs)
      val partitionColumns = SparkKeyGenUtils.getPartitionColumns(keyGenerator, toProperties(parameters), false)
      val partitionColumnsForKeyGenerator = SparkKeyGenUtils.getPartitionColumnsForKeyGenerator(toProperties(parameters), HoodieTableVersion.fromVersionCode(tableVersion))
      val timelineTimeZone = HoodieTimelineTimeZone.valueOf(hoodieConfig.getStringOrDefault(HoodieTableConfig.TIMELINE_TIMEZONE))
      val tableMetaClient = if (tableExists) {
        HoodieInstantTimeGenerator.setCommitTimeZone(timelineTimeZone)
        HoodieTableMetaClient.builder
          .setConf(HadoopFSUtils.getStorageConfWithCopy(sparkContext.hadoopConfiguration))
          .setBasePath(path)
          .build()
      } else {
        val baseFileFormat = hoodieConfig.getStringOrDefault(HoodieTableConfig.BASE_FILE_FORMAT)
        val archiveLogFolder = hoodieConfig.getStringOrDefault(HoodieTableConfig.TIMELINE_HISTORY_PATH)
        val populateMetaFields = hoodieConfig.getBooleanOrDefault(HoodieTableConfig.POPULATE_META_FIELDS)
        val useBaseFormatMetaFile = hoodieConfig.getBooleanOrDefault(HoodieTableConfig.PARTITION_METAFILE_USE_BASE_FORMAT);
        val payloadClass = hoodieConfig.getString(DataSourceWriteOptions.PAYLOAD_CLASS_NAME)
        val recordMergeStrategyId = hoodieConfig.getString(DataSourceWriteOptions.RECORD_MERGE_STRATEGY_ID)
        val keyGenProp =
          if (StringUtils.nonEmpty(hoodieConfig.getString(DataSourceWriteOptions.KEYGENERATOR_CLASS_NAME)))
            hoodieConfig.getString(DataSourceWriteOptions.KEYGENERATOR_CLASS_NAME)
          else KeyGeneratorType.getKeyGeneratorClassName(hoodieConfig)
        HoodieTableMetaClient.newTableBuilder()
          .setTableType(tableType)
          .setTableVersion(tableVersion)
          .setDatabaseName(databaseName)
          .setTableName(tblName)
          .setBaseFileFormat(baseFileFormat)
          .setArchiveLogFolder(archiveLogFolder)
          // we can't fetch preCombine field from hoodieConfig object, since it falls back to "ts" as default value,
          // but we are interested in what user has set, hence fetching from optParams.
          .setPreCombineField(optParams.getOrElse(PRECOMBINE_FIELD.key(), null))
          .setPartitionFields(partitionColumnsForKeyGenerator)
          .setPopulateMetaFields(populateMetaFields)
          .setRecordKeyFields(hoodieConfig.getString(RECORDKEY_FIELD))
          .setSecondaryKeyFields(hoodieConfig.getString(SECONDARYKEY_COLUMN_NAME))
          .setCDCEnabled(hoodieConfig.getBooleanOrDefault(HoodieTableConfig.CDC_ENABLED))
          .setCDCSupplementalLoggingMode(hoodieConfig.getStringOrDefault(HoodieTableConfig.CDC_SUPPLEMENTAL_LOGGING_MODE))
          .setKeyGeneratorClassProp(keyGenProp)
          .set(timestampKeyGeneratorConfigs.asJava.asInstanceOf[java.util.Map[String, Object]])
          .setHiveStylePartitioningEnable(hoodieConfig.getBoolean(HIVE_STYLE_PARTITIONING))
          .setUrlEncodePartitioning(hoodieConfig.getBoolean(URL_ENCODE_PARTITIONING))
          .setPartitionMetafileUseBaseFormat(useBaseFormatMetaFile)
          .setShouldDropPartitionColumns(hoodieConfig.getBooleanOrDefault(HoodieTableConfig.DROP_PARTITION_COLUMNS))
          .setCommitTimezone(timelineTimeZone)
          .setPayloadClassName(payloadClass)
          .setRecordMergeStrategyId(recordMergeStrategyId)
          .setRecordMergeMode(RecordMergeMode.getValue(hoodieConfig.getString(HoodieWriteConfig.RECORD_MERGE_MODE)))
          .setMultipleBaseFileFormatsEnabled(hoodieConfig.getBoolean(HoodieTableConfig.MULTIPLE_BASE_FILE_FORMATS_ENABLE))
          .initTable(HadoopFSUtils.getStorageConfWithCopy(sparkContext.hadoopConfiguration), path)
      }

      // take care of partition level bucket index which is simple bucket index.
      // for BUCKET_RESCALE action will set related configs in call command, so skip here.
      if (hoodieConfig.getStringOrDefault(HoodieIndexConfig.INDEX_TYPE, "") == HoodieIndex.IndexType.BUCKET.name
        && PartitionBucketIndexUtils.isPartitionSimpleBucketIndex(tableMetaClient.getStorageConf, basePath.toString)
        && hoodieConfig.getStringOrDefault(HoodieInternalConfig.BULKINSERT_OVERWRITE_OPERATION_TYPE, "") != WriteOperationType.BUCKET_RESCALE.value()) {

        val latestHashingConfig = PartitionBucketIndexHashingConfig.loadingLatestHashingConfig(tableMetaClient)
        hoodieConfig.setValue(HoodieIndexConfig.BUCKET_INDEX_PARTITION_EXPRESSIONS.key, latestHashingConfig.getExpressions)
        hoodieConfig.setValue(HoodieIndexConfig.BUCKET_INDEX_NUM_BUCKETS.key, latestHashingConfig.getDefaultBucketNumber.toString)
        hoodieConfig.setValue(HoodieIndexConfig.BUCKET_INDEX_PARTITION_RULE_TYPE.key, latestHashingConfig.getRule)
        parameters = parameters ++ Map(HoodieIndexConfig.BUCKET_INDEX_PARTITION_EXPRESSIONS.key -> latestHashingConfig.getExpressions)
        parameters = parameters ++ Map(HoodieIndexConfig.BUCKET_INDEX_NUM_BUCKETS.key -> latestHashingConfig.getDefaultBucketNumber.toString)
        parameters = parameters ++ Map(HoodieIndexConfig.BUCKET_INDEX_PARTITION_RULE_TYPE.key -> latestHashingConfig.getRule)
      }

      var instantTime: String = null
      tableConfig = tableMetaClient.getTableConfig

      val commitActionType = CommitUtils.getCommitActionType(operation, tableConfig.getTableType)

      // Register Avro classes ([[Schema]], [[GenericData]]) w/ Kryo
      sparkContext.getConf.registerKryoClasses(
        Array(classOf[GenericData],
          classOf[Schema]))

      val shouldReconcileSchema = parameters(DataSourceWriteOptions.RECONCILE_SCHEMA.key()).toBoolean
      val latestTableSchemaOpt = getLatestTableSchema(spark, tableIdentifier, tableMetaClient)
      val df = if (preppedWriteOperation || preppedSparkSqlWrites || preppedSparkSqlMergeInto || sourceDf.isStreaming) {
        sourceDf
      } else {
        sourceDf.drop(HoodieRecord.HOODIE_META_COLUMNS.asScala.toSeq: _*)
      }
      // NOTE: We need to make sure that upon conversion of the schemas b/w Catalyst's [[StructType]] and
      //       Avro's [[Schema]] we're preserving corresponding "record-name" and "record-namespace" that
      //       play crucial role in establishing compatibility b/w schemas
      val (avroRecordName, avroRecordNamespace) = latestTableSchemaOpt.map(s => (s.getName, s.getNamespace))
        .getOrElse(getAvroRecordNameAndNamespace(tblName))

      val sourceSchema = convertStructTypeToAvroSchema(df.schema, avroRecordName, avroRecordNamespace)
      val internalSchemaOpt = HoodieSchemaUtils.getLatestTableInternalSchema(hoodieConfig, tableMetaClient).orElse {
        // In case we need to reconcile the schema and schema evolution is enabled,
        // we will force-apply schema evolution to the writer's schema
        if (shouldReconcileSchema && hoodieConfig.getBooleanOrDefault(DataSourceReadOptions.SCHEMA_EVOLUTION_ENABLED)) {
          val allowOperationMetaDataField = parameters.getOrElse(HoodieWriteConfig.ALLOW_OPERATION_METADATA_FIELD.key(), "false").toBoolean
          Some(AvroInternalSchemaConverter.convert(HoodieAvroUtils.addMetadataFields(latestTableSchemaOpt.getOrElse(sourceSchema), allowOperationMetaDataField)))
        } else {
          None
        }
      }

      val (writeResult: HoodieWriteResult, writeClient: SparkRDDWriteClient[_]) =
        operation match {
          case WriteOperationType.DELETE | WriteOperationType.DELETE_PREPPED =>
            mayBeValidateParamsForAutoGenerationOfRecordKeys(parameters, hoodieConfig)
            val genericRecords = HoodieSparkUtils.createRdd(df, avroRecordName, avroRecordNamespace)
            // Convert to RDD[HoodieKey]
            val hoodieKeysAndLocationsToDelete = genericRecords.mapPartitions(it => {
              val keyGenerator: Option[BaseKeyGenerator] = if (preppedSparkSqlWrites || preppedWriteOperation) {
                None
              } else {
                Some(HoodieSparkKeyGeneratorFactory.createKeyGenerator(TypedProperties.copy(hoodieConfig.getProps))
                  .asInstanceOf[BaseKeyGenerator])
              }
              it.map { avroRec =>
                HoodieCreateRecordUtils.getHoodieKeyAndMaybeLocationFromAvroRecord(keyGenerator, avroRec, preppedSparkSqlWrites || preppedWriteOperation, preppedSparkSqlWrites || preppedSparkSqlMergeInto || preppedWriteOperation)
              }
            }).toJavaRDD()

            if (!tableExists) {
              throw new HoodieException(s"hoodie table at $basePath does not exist")
            }

            // Create a HoodieWriteClient & issue the delete.
            val internalSchemaOpt = HoodieSchemaUtils.getLatestTableInternalSchema(hoodieConfig, tableMetaClient)
            val client = hoodieWriteClient.getOrElse(DataSourceUtils.createHoodieClient(jsc,
                null, path, tblName,
                (addSchemaEvolutionParameters(parameters, internalSchemaOpt) - HoodieWriteConfig.AUTO_COMMIT_ENABLE.key).asJava))
              .asInstanceOf[SparkRDDWriteClient[_]]

            if (isAsyncCompactionEnabled(client, tableConfig, parameters, jsc.hadoopConfiguration())) {
              streamingWritesParamsOpt.map(_.asyncCompactionTriggerFn.get.apply(client))
            }
            if (isAsyncClusteringEnabled(client, parameters)) {
              streamingWritesParamsOpt.map(_.asyncClusteringTriggerFn.get.apply(client))
            }

            instantTime = client.createNewInstantTime()
            // Issue deletes
            client.startCommitWithTime(instantTime, commitActionType)
            val writeStatuses = DataSourceUtils.doDeleteOperation(client, hoodieKeysAndLocationsToDelete, instantTime, preppedSparkSqlWrites || preppedWriteOperation)
            (writeStatuses, client)

          case WriteOperationType.DELETE_PARTITION =>
            if (!tableExists) {
              throw new HoodieException(s"hoodie table at $basePath does not exist")
            }

            val keyGenerator = HoodieSparkKeyGeneratorFactory.createKeyGenerator(TypedProperties.copy(hoodieConfig.getProps))
            val tableMetaClient = HoodieTableMetaClient.builder
              .setConf(HadoopFSUtils.getStorageConfWithCopy(sparkContext.hadoopConfiguration))
              .setBasePath(basePath.toString).build()
            // Get list of partitions to delete
            val partitionsToDelete = if (parameters.contains(DataSourceWriteOptions.PARTITIONS_TO_DELETE.key())) {
              val partitionColsToDelete = parameters(DataSourceWriteOptions.PARTITIONS_TO_DELETE.key()).split(",")
              java.util.Arrays.asList(resolvePartitionWildcards(java.util.Arrays.asList(partitionColsToDelete: _*).asScala.toList, jsc,
                tableMetaClient.getStorage, hoodieConfig, basePath.toString): _*)
            } else {
              val genericRecords = HoodieSparkUtils.createRdd(df, avroRecordName, avroRecordNamespace)
              genericRecords.map(gr => keyGenerator.getKey(gr).getPartitionPath).toJavaRDD().distinct().collect()
            }

            // Issue the delete.
            val schemaStr = new TableSchemaResolver(tableMetaClient).getTableAvroSchema(false).toString
            val client = hoodieWriteClient.getOrElse(DataSourceUtils.createHoodieClient(jsc,
                schemaStr, path, tblName,
                (parameters - HoodieWriteConfig.AUTO_COMMIT_ENABLE.key).asJava))
              .asInstanceOf[SparkRDDWriteClient[_]]
            // Issue delete partitions
            instantTime = client.createNewInstantTime()
            client.startCommitWithTime(instantTime, commitActionType)
            val writeStatuses = DataSourceUtils.doDeletePartitionsOperation(client, partitionsToDelete, instantTime)
            (writeStatuses, client)

          // Here all other (than DELETE, DELETE_PARTITION) write operations are handled
          case _ =>
            // NOTE: Target writer's schema is deduced based on
            //         - Source's schema
            //         - Existing table's schema (including its Hudi's [[InternalSchema]] representation)
            val writerSchema = HoodieSchemaUtils.deduceWriterSchema(sourceSchema, latestTableSchemaOpt, internalSchemaOpt, parameters)

            validateSchemaForHoodieIsDeleted(writerSchema)
            mayBeValidateParamsForAutoGenerationOfRecordKeys(parameters, hoodieConfig)

            // Check whether partition columns should be persisted w/in the data-files, or should
            // be instead omitted from them and simply encoded into the partition path (which is Spark's
            // behavior by default)
            // TODO move partition columns handling down into the handlers
            val shouldDropPartitionColumns = hoodieConfig.getBoolean(DataSourceWriteOptions.DROP_PARTITION_COLUMNS)
            val dataFileSchema = if (shouldDropPartitionColumns) {
              val truncatedSchema = generateSchemaWithoutPartitionColumns(partitionColumns, writerSchema)
              // NOTE: We have to register this schema w/ Kryo to make sure it's able to apply an optimization
              //       allowing it to avoid the need to ser/de the whole schema along _every_ Avro record
              registerAvroSchemasWithKryo(sparkContext, truncatedSchema)
              truncatedSchema
            } else {
              writerSchema
            }

            // Remove meta columns from writerSchema if isPrepped is true.
            val processedDataSchema = if (preppedSparkSqlWrites || preppedSparkSqlMergeInto || preppedWriteOperation) {
              HoodieAvroUtils.removeMetadataFields(dataFileSchema)
            } else {
              dataFileSchema
            }

            // Create a HoodieWriteClient & issue the write.
            val client = hoodieWriteClient.getOrElse {
              val finalOpts = addSchemaEvolutionParameters(parameters, internalSchemaOpt, Some(writerSchema)) - HoodieWriteConfig.AUTO_COMMIT_ENABLE.key
              // TODO(HUDI-4772) proper writer-schema has to be specified here
              DataSourceUtils.createHoodieClient(jsc, processedDataSchema.toString, path, tblName, finalOpts.asJava)
            }

            if (isAsyncCompactionEnabled(client, tableConfig, parameters, jsc.hadoopConfiguration())) {
              streamingWritesParamsOpt.map(_.asyncCompactionTriggerFn.get.apply(client))
            }

            if (isAsyncClusteringEnabled(client, parameters)) {
              streamingWritesParamsOpt.map(_.asyncClusteringTriggerFn.get.apply(client))
            }

            // Short-circuit if bulk_insert via row is enabled.
            // scalastyle:off
            if (hoodieConfig.getBoolean(ENABLE_ROW_WRITER) && operation == WriteOperationType.BULK_INSERT) {
              return bulkInsertAsRow(client, parameters, hoodieConfig, df, mode, tblName, basePath, writerSchema, tableConfig)
            }
            // scalastyle:on

            val writeConfig = client.getConfig
            if (writeConfig.getRecordMerger.getRecordType == HoodieRecordType.SPARK && tableType == MERGE_ON_READ && writeConfig.getLogDataBlockFormat.orElse(HoodieLogBlockType.AVRO_DATA_BLOCK) != HoodieLogBlockType.PARQUET_DATA_BLOCK) {
              throw new UnsupportedOperationException(s"${writeConfig.getRecordMerger.getClass.getName} only support parquet log.")
            }
            instantTime = client.createNewInstantTime()
            // Convert to RDD[HoodieRecord]
            val hoodieRecords = Try(HoodieCreateRecordUtils.createHoodieRecordRdd(
              HoodieCreateRecordUtils.createHoodieRecordRddArgs(df, writeConfig, parameters, avroRecordName,
                avroRecordNamespace, writerSchema, processedDataSchema, operation, instantTime, preppedSparkSqlWrites,
                preppedSparkSqlMergeInto, preppedWriteOperation))) match {
              case Success(recs) => recs
              case Failure(e) => throw new HoodieRecordCreationException("Failed to create Hoodie Spark Record", e)
            }

            // Remove duplicates from incoming records based on existing keys from storage.
            val dedupedHoodieRecords = handleInsertDuplicates(hoodieRecords, hoodieConfig, operation, jsc, parameters)
            client.startCommitWithTime(instantTime, commitActionType)
            try {
              val writeResult = DataSourceUtils.doWriteOperation(client, dedupedHoodieRecords, instantTime, operation,
                preppedSparkSqlWrites || preppedWriteOperation)
              (writeResult, client)
            } catch {
              case e: HoodieException =>
                // close the write client in all cases
                handleWriteClientClosure(client, tableConfig, parameters, jsc.hadoopConfiguration())
                throw e
            }
        }

      // Check for errors and commit the write.
      try {
        val (writeSuccessful, compactionInstant, clusteringInstant) =
          commitAndPerformPostOperations(sqlContext.sparkSession, df.schema,
            writeResult, parameters, writeClient, tableConfig, jsc,
            TableInstantInfo(basePath, instantTime, commitActionType, operation), streamingWritesParamsOpt.map(_.extraPreCommitFn)
              .orElse(Option.apply(Option.empty)).get)

        (writeSuccessful, common.util.Option.ofNullable(instantTime), compactionInstant, clusteringInstant, writeClient, tableConfig)
      } finally {
        handleWriteClientClosure(writeClient, tableConfig, parameters, jsc.hadoopConfiguration())
      }
    }
  }