def buildHoodieInsertConfig()

in hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/ProvidesHoodieConfig.scala [166:311]


  def buildHoodieInsertConfig(hoodieCatalogTable: HoodieCatalogTable,
                              sparkSession: SparkSession,
                              isOverwritePartition: Boolean,
                              isOverwriteTable: Boolean,
                              insertPartitions: Map[String, Option[String]] = Map.empty,
                              extraOptions: Map[String, String],
                              staticOverwritePartitionPathOpt: Option[String] = Option.empty): Map[String, String] = {

    if (insertPartitions.nonEmpty &&
      (insertPartitions.keys.toSet != hoodieCatalogTable.partitionFields.toSet)) {
      throw new IllegalArgumentException(s"Insert partition fields" +
        s"[${insertPartitions.keys.mkString(" ")}]" +
        s" not equal to the defined partition in table[${hoodieCatalogTable.partitionFields.mkString(",")}]")
    }
    val path = hoodieCatalogTable.tableLocation
    val tableType = hoodieCatalogTable.tableTypeName
    val tableConfig = hoodieCatalogTable.tableConfig

    val combinedOpts: Map[String, String] = combineOptions(hoodieCatalogTable, tableConfig, sparkSession.sqlContext.conf,
      defaultOpts = Map.empty, overridingOpts = extraOptions)
    val hiveSyncConfig = buildHiveSyncConfig(sparkSession, hoodieCatalogTable, tableConfig, extraOptions)

    val partitionFieldsStr = hoodieCatalogTable.partitionFields.mkString(",")

    // NOTE: Here we fallback to "" to make sure that null value is not overridden with
    // default value ("ts")
    // TODO(HUDI-3456) clean up
    val preCombineField = combinedOpts.getOrElse(HoodieTableConfig.PRECOMBINE_FIELD.key,
      combinedOpts.getOrElse(PRECOMBINE_FIELD.key, ""))

    val hiveStylePartitioningEnable = Option(tableConfig.getHiveStylePartitioningEnable).getOrElse("true")
    val urlEncodePartitioning = Option(tableConfig.getUrlEncodePartitioning).getOrElse("false")
    val keyGeneratorClassName = Option(tableConfig.getKeyGeneratorClassName)
      .getOrElse(classOf[ComplexKeyGenerator].getCanonicalName)

    val enableBulkInsert = combinedOpts.getOrElse(DataSourceWriteOptions.SQL_ENABLE_BULK_INSERT.key,
      DataSourceWriteOptions.SQL_ENABLE_BULK_INSERT.defaultValue()).toBoolean
    val dropDuplicate = sparkSession.conf
      .getOption(INSERT_DROP_DUPS.key).getOrElse(INSERT_DROP_DUPS.defaultValue).toBoolean
    val shouldAutoKeyGen: Boolean = shouldAutoGenerateRecordKeys(combinedOpts)

    val insertMode = InsertMode.of(combinedOpts.getOrElse(DataSourceWriteOptions.SQL_INSERT_MODE.key,
      DataSourceWriteOptions.SQL_INSERT_MODE.defaultValue()))
    val insertModeSet = combinedOpts.contains(SQL_INSERT_MODE.key)
    val sparkSqlInsertIntoOperationOpt = combinedOpts.get(SPARK_SQL_INSERT_INTO_OPERATION.key())
    val sparkSqlInsertIntoOperationSet = sparkSqlInsertIntoOperationOpt.nonEmpty
    val sparkSqlInsertIntoOperation = sparkSqlInsertIntoOperationOpt.getOrElse(SPARK_SQL_INSERT_INTO_OPERATION.defaultValue())
    val insertDupPolicyOpt = combinedOpts.get(INSERT_DUP_POLICY.key())
    val insertDupPolicySet = insertDupPolicyOpt.nonEmpty
    val insertDupPolicy = combinedOpts.getOrElse(INSERT_DUP_POLICY.key(), INSERT_DUP_POLICY.defaultValue())
    val isNonStrictMode = insertMode == InsertMode.NON_STRICT
    val isPartitionedTable = hoodieCatalogTable.partitionFields.nonEmpty
    val combineBeforeInsert = hoodieCatalogTable.preCombineKey.nonEmpty && hoodieCatalogTable.primaryKeys.nonEmpty

    /*
     * The sql write operation has higher precedence than the legacy insert mode.
     * Only when the legacy insert mode is explicitly set, without setting sql write operation,
     * legacy configs will be honored. On all other cases (i.e when both are set, either is set,
     * or when only the sql write operation is set), we honor the sql write operation.
     */
    val useLegacyInsertModeFlow = insertModeSet && !sparkSqlInsertIntoOperationSet
    var operation = combinedOpts.getOrElse(OPERATION.key,
      if (useLegacyInsertModeFlow) {
        // NOTE: Target operation could be overridden by the user, therefore if it has been provided as an input
        //       we'd prefer that value over auto-deduced operation. Otherwise, we deduce target operation type
        deduceOperation(enableBulkInsert, isOverwritePartition, isOverwriteTable, dropDuplicate,
          isNonStrictMode, isPartitionedTable, combineBeforeInsert, insertMode, shouldAutoKeyGen)
      } else {
        deduceSparkSqlInsertIntoWriteOperation(isOverwritePartition, isOverwriteTable,
          shouldAutoKeyGen, preCombineField, sparkSqlInsertIntoOperationSet, sparkSqlInsertIntoOperation)
      }
    )

    val overwriteTableOpts = if (operation.equals(BULK_INSERT_OPERATION_OPT_VAL)) {
      if (isOverwriteTable) {
        Map(HoodieInternalConfig.BULKINSERT_OVERWRITE_OPERATION_TYPE.key -> WriteOperationType.INSERT_OVERWRITE_TABLE.value())
      } else if (isOverwritePartition) {
        Map(HoodieInternalConfig.BULKINSERT_OVERWRITE_OPERATION_TYPE.key -> WriteOperationType.INSERT_OVERWRITE.value())
      } else {
        Map()
      }
    } else if (operation.equals(INSERT_OVERWRITE_TABLE_OPERATION_OPT_VAL)) {
      if (sparkSqlInsertIntoOperation.equals(BULK_INSERT_OPERATION_OPT_VAL) || enableBulkInsert) {
        operation = BULK_INSERT_OPERATION_OPT_VAL
        Map(HoodieInternalConfig.BULKINSERT_OVERWRITE_OPERATION_TYPE.key -> WriteOperationType.INSERT_OVERWRITE_TABLE.value())
      } else {
        Map()
      }
    } else if (operation.equals(INSERT_OVERWRITE_OPERATION_OPT_VAL)) {
      if (sparkSqlInsertIntoOperation.equals(BULK_INSERT_OPERATION_OPT_VAL) || enableBulkInsert) {
        operation = BULK_INSERT_OPERATION_OPT_VAL
        Map(HoodieInternalConfig.BULKINSERT_OVERWRITE_OPERATION_TYPE.key -> WriteOperationType.INSERT_OVERWRITE.value())
      } else {
        Map()
      }
    } else {
      Map()
    }

    val staticOverwritePartitionPathOptions = staticOverwritePartitionPathOpt match {
      case Some(staticOverwritePartitionPath) =>
        Map(HoodieInternalConfig.STATIC_OVERWRITE_PARTITION_PATHS.key() -> staticOverwritePartitionPath)
      case _ =>
        Map()
    }

    val defaultOpts = Map(
      // NOTE: By default insert would try to do deduplication in case that pre-combine column is specified
      //       for the table
      HoodieWriteConfig.COMBINE_BEFORE_INSERT.key -> String.valueOf(combineBeforeInsert),
      KEYGENERATOR_CLASS_NAME.key -> classOf[SqlKeyGenerator].getCanonicalName,
      SqlKeyGenerator.ORIGINAL_KEYGEN_CLASS_NAME -> keyGeneratorClassName,
      SqlKeyGenerator.PARTITION_SCHEMA -> hoodieCatalogTable.partitionSchema.toDDL,
      HoodieSyncConfig.META_SYNC_PARTITION_FIELDS.key -> partitionFieldsStr,
      HoodieSyncConfig.META_SYNC_ENABLED.key -> hiveSyncConfig.getString(HoodieSyncConfig.META_SYNC_ENABLED.key),
      HiveSyncConfigHolder.HIVE_SYNC_ENABLED.key -> hiveSyncConfig.getString(HiveSyncConfigHolder.HIVE_SYNC_ENABLED.key),
      HiveSyncConfigHolder.HIVE_SYNC_MODE.key -> hiveSyncConfig.getStringOrDefault(HiveSyncConfigHolder.HIVE_SYNC_MODE, HiveSyncMode.HMS.name()),
      HoodieSyncConfig.META_SYNC_DATABASE_NAME.key -> hiveSyncConfig.getStringOrDefault(HoodieSyncConfig.META_SYNC_DATABASE_NAME),
      HoodieSyncConfig.META_SYNC_TABLE_NAME.key -> hiveSyncConfig.getStringOrDefault(HoodieSyncConfig.META_SYNC_TABLE_NAME),
      HiveSyncConfigHolder.HIVE_SUPPORT_TIMESTAMP_TYPE.key -> hiveSyncConfig.getBoolean(HiveSyncConfigHolder.HIVE_SUPPORT_TIMESTAMP_TYPE).toString,
      HoodieSyncConfig.META_SYNC_PARTITION_EXTRACTOR_CLASS.key -> hiveSyncConfig.getStringOrDefault(HoodieSyncConfig.META_SYNC_PARTITION_EXTRACTOR_CLASS)
    )
    // for auto record key gen
    val recordKeyConfigValue = if (hoodieCatalogTable.primaryKeys.length > 1) {
      hoodieCatalogTable.primaryKeys.mkString(",")
    } else {
      null
    }

    val overridingOpts = extraOptions ++ Map(
      "path" -> path,
      TABLE_TYPE.key -> tableType,
      DATABASE_NAME.key -> hoodieCatalogTable.table.database,
      TBL_NAME.key -> hoodieCatalogTable.tableName,
      OPERATION.key -> operation,
      HIVE_STYLE_PARTITIONING.key -> hiveStylePartitioningEnable,
      URL_ENCODE_PARTITIONING.key -> urlEncodePartitioning,
      RECORDKEY_FIELD.key -> recordKeyConfigValue,
      PRECOMBINE_FIELD.key -> preCombineField,
      PARTITIONPATH_FIELD.key -> getPartitionPathFieldWriteConfig(
        keyGeneratorClassName, partitionFieldsStr, hoodieCatalogTable)
    ) ++ overwriteTableOpts ++ getDropDupsConfig(useLegacyInsertModeFlow, combinedOpts) ++ staticOverwritePartitionPathOptions

    combineOptions(hoodieCatalogTable, tableConfig, sparkSession.sqlContext.conf,
      defaultOpts = defaultOpts, overridingOpts = overridingOpts)
  }