in hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/ProvidesHoodieConfig.scala [166:311]
def buildHoodieInsertConfig(hoodieCatalogTable: HoodieCatalogTable,
sparkSession: SparkSession,
isOverwritePartition: Boolean,
isOverwriteTable: Boolean,
insertPartitions: Map[String, Option[String]] = Map.empty,
extraOptions: Map[String, String],
staticOverwritePartitionPathOpt: Option[String] = Option.empty): Map[String, String] = {
if (insertPartitions.nonEmpty &&
(insertPartitions.keys.toSet != hoodieCatalogTable.partitionFields.toSet)) {
throw new IllegalArgumentException(s"Insert partition fields" +
s"[${insertPartitions.keys.mkString(" ")}]" +
s" not equal to the defined partition in table[${hoodieCatalogTable.partitionFields.mkString(",")}]")
}
val path = hoodieCatalogTable.tableLocation
val tableType = hoodieCatalogTable.tableTypeName
val tableConfig = hoodieCatalogTable.tableConfig
val combinedOpts: Map[String, String] = combineOptions(hoodieCatalogTable, tableConfig, sparkSession.sqlContext.conf,
defaultOpts = Map.empty, overridingOpts = extraOptions)
val hiveSyncConfig = buildHiveSyncConfig(sparkSession, hoodieCatalogTable, tableConfig, extraOptions)
val partitionFieldsStr = hoodieCatalogTable.partitionFields.mkString(",")
// NOTE: Here we fallback to "" to make sure that null value is not overridden with
// default value ("ts")
// TODO(HUDI-3456) clean up
val preCombineField = combinedOpts.getOrElse(HoodieTableConfig.PRECOMBINE_FIELD.key,
combinedOpts.getOrElse(PRECOMBINE_FIELD.key, ""))
val hiveStylePartitioningEnable = Option(tableConfig.getHiveStylePartitioningEnable).getOrElse("true")
val urlEncodePartitioning = Option(tableConfig.getUrlEncodePartitioning).getOrElse("false")
val keyGeneratorClassName = Option(tableConfig.getKeyGeneratorClassName)
.getOrElse(classOf[ComplexKeyGenerator].getCanonicalName)
val enableBulkInsert = combinedOpts.getOrElse(DataSourceWriteOptions.SQL_ENABLE_BULK_INSERT.key,
DataSourceWriteOptions.SQL_ENABLE_BULK_INSERT.defaultValue()).toBoolean
val dropDuplicate = sparkSession.conf
.getOption(INSERT_DROP_DUPS.key).getOrElse(INSERT_DROP_DUPS.defaultValue).toBoolean
val shouldAutoKeyGen: Boolean = shouldAutoGenerateRecordKeys(combinedOpts)
val insertMode = InsertMode.of(combinedOpts.getOrElse(DataSourceWriteOptions.SQL_INSERT_MODE.key,
DataSourceWriteOptions.SQL_INSERT_MODE.defaultValue()))
val insertModeSet = combinedOpts.contains(SQL_INSERT_MODE.key)
val sparkSqlInsertIntoOperationOpt = combinedOpts.get(SPARK_SQL_INSERT_INTO_OPERATION.key())
val sparkSqlInsertIntoOperationSet = sparkSqlInsertIntoOperationOpt.nonEmpty
val sparkSqlInsertIntoOperation = sparkSqlInsertIntoOperationOpt.getOrElse(SPARK_SQL_INSERT_INTO_OPERATION.defaultValue())
val insertDupPolicyOpt = combinedOpts.get(INSERT_DUP_POLICY.key())
val insertDupPolicySet = insertDupPolicyOpt.nonEmpty
val insertDupPolicy = combinedOpts.getOrElse(INSERT_DUP_POLICY.key(), INSERT_DUP_POLICY.defaultValue())
val isNonStrictMode = insertMode == InsertMode.NON_STRICT
val isPartitionedTable = hoodieCatalogTable.partitionFields.nonEmpty
val combineBeforeInsert = hoodieCatalogTable.preCombineKey.nonEmpty && hoodieCatalogTable.primaryKeys.nonEmpty
/*
* The sql write operation has higher precedence than the legacy insert mode.
* Only when the legacy insert mode is explicitly set, without setting sql write operation,
* legacy configs will be honored. On all other cases (i.e when both are set, either is set,
* or when only the sql write operation is set), we honor the sql write operation.
*/
val useLegacyInsertModeFlow = insertModeSet && !sparkSqlInsertIntoOperationSet
var operation = combinedOpts.getOrElse(OPERATION.key,
if (useLegacyInsertModeFlow) {
// NOTE: Target operation could be overridden by the user, therefore if it has been provided as an input
// we'd prefer that value over auto-deduced operation. Otherwise, we deduce target operation type
deduceOperation(enableBulkInsert, isOverwritePartition, isOverwriteTable, dropDuplicate,
isNonStrictMode, isPartitionedTable, combineBeforeInsert, insertMode, shouldAutoKeyGen)
} else {
deduceSparkSqlInsertIntoWriteOperation(isOverwritePartition, isOverwriteTable,
shouldAutoKeyGen, preCombineField, sparkSqlInsertIntoOperationSet, sparkSqlInsertIntoOperation)
}
)
val overwriteTableOpts = if (operation.equals(BULK_INSERT_OPERATION_OPT_VAL)) {
if (isOverwriteTable) {
Map(HoodieInternalConfig.BULKINSERT_OVERWRITE_OPERATION_TYPE.key -> WriteOperationType.INSERT_OVERWRITE_TABLE.value())
} else if (isOverwritePartition) {
Map(HoodieInternalConfig.BULKINSERT_OVERWRITE_OPERATION_TYPE.key -> WriteOperationType.INSERT_OVERWRITE.value())
} else {
Map()
}
} else if (operation.equals(INSERT_OVERWRITE_TABLE_OPERATION_OPT_VAL)) {
if (sparkSqlInsertIntoOperation.equals(BULK_INSERT_OPERATION_OPT_VAL) || enableBulkInsert) {
operation = BULK_INSERT_OPERATION_OPT_VAL
Map(HoodieInternalConfig.BULKINSERT_OVERWRITE_OPERATION_TYPE.key -> WriteOperationType.INSERT_OVERWRITE_TABLE.value())
} else {
Map()
}
} else if (operation.equals(INSERT_OVERWRITE_OPERATION_OPT_VAL)) {
if (sparkSqlInsertIntoOperation.equals(BULK_INSERT_OPERATION_OPT_VAL) || enableBulkInsert) {
operation = BULK_INSERT_OPERATION_OPT_VAL
Map(HoodieInternalConfig.BULKINSERT_OVERWRITE_OPERATION_TYPE.key -> WriteOperationType.INSERT_OVERWRITE.value())
} else {
Map()
}
} else {
Map()
}
val staticOverwritePartitionPathOptions = staticOverwritePartitionPathOpt match {
case Some(staticOverwritePartitionPath) =>
Map(HoodieInternalConfig.STATIC_OVERWRITE_PARTITION_PATHS.key() -> staticOverwritePartitionPath)
case _ =>
Map()
}
val defaultOpts = Map(
// NOTE: By default insert would try to do deduplication in case that pre-combine column is specified
// for the table
HoodieWriteConfig.COMBINE_BEFORE_INSERT.key -> String.valueOf(combineBeforeInsert),
KEYGENERATOR_CLASS_NAME.key -> classOf[SqlKeyGenerator].getCanonicalName,
SqlKeyGenerator.ORIGINAL_KEYGEN_CLASS_NAME -> keyGeneratorClassName,
SqlKeyGenerator.PARTITION_SCHEMA -> hoodieCatalogTable.partitionSchema.toDDL,
HoodieSyncConfig.META_SYNC_PARTITION_FIELDS.key -> partitionFieldsStr,
HoodieSyncConfig.META_SYNC_ENABLED.key -> hiveSyncConfig.getString(HoodieSyncConfig.META_SYNC_ENABLED.key),
HiveSyncConfigHolder.HIVE_SYNC_ENABLED.key -> hiveSyncConfig.getString(HiveSyncConfigHolder.HIVE_SYNC_ENABLED.key),
HiveSyncConfigHolder.HIVE_SYNC_MODE.key -> hiveSyncConfig.getStringOrDefault(HiveSyncConfigHolder.HIVE_SYNC_MODE, HiveSyncMode.HMS.name()),
HoodieSyncConfig.META_SYNC_DATABASE_NAME.key -> hiveSyncConfig.getStringOrDefault(HoodieSyncConfig.META_SYNC_DATABASE_NAME),
HoodieSyncConfig.META_SYNC_TABLE_NAME.key -> hiveSyncConfig.getStringOrDefault(HoodieSyncConfig.META_SYNC_TABLE_NAME),
HiveSyncConfigHolder.HIVE_SUPPORT_TIMESTAMP_TYPE.key -> hiveSyncConfig.getBoolean(HiveSyncConfigHolder.HIVE_SUPPORT_TIMESTAMP_TYPE).toString,
HoodieSyncConfig.META_SYNC_PARTITION_EXTRACTOR_CLASS.key -> hiveSyncConfig.getStringOrDefault(HoodieSyncConfig.META_SYNC_PARTITION_EXTRACTOR_CLASS)
)
// for auto record key gen
val recordKeyConfigValue = if (hoodieCatalogTable.primaryKeys.length > 1) {
hoodieCatalogTable.primaryKeys.mkString(",")
} else {
null
}
val overridingOpts = extraOptions ++ Map(
"path" -> path,
TABLE_TYPE.key -> tableType,
DATABASE_NAME.key -> hoodieCatalogTable.table.database,
TBL_NAME.key -> hoodieCatalogTable.tableName,
OPERATION.key -> operation,
HIVE_STYLE_PARTITIONING.key -> hiveStylePartitioningEnable,
URL_ENCODE_PARTITIONING.key -> urlEncodePartitioning,
RECORDKEY_FIELD.key -> recordKeyConfigValue,
PRECOMBINE_FIELD.key -> preCombineField,
PARTITIONPATH_FIELD.key -> getPartitionPathFieldWriteConfig(
keyGeneratorClassName, partitionFieldsStr, hoodieCatalogTable)
) ++ overwriteTableOpts ++ getDropDupsConfig(useLegacyInsertModeFlow, combinedOpts) ++ staticOverwritePartitionPathOptions
combineOptions(hoodieCatalogTable, tableConfig, sparkSession.sqlContext.conf,
defaultOpts = defaultOpts, overridingOpts = overridingOpts)
}