in hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala [214:549]
private def writeInternal(sqlContext: SQLContext,
mode: SaveMode,
optParams: Map[String, String],
sourceDf: DataFrame,
streamingWritesParamsOpt: Option[StreamingWriteParams] = Option.empty,
hoodieWriteClient: Option[SparkRDDWriteClient[_]] = Option.empty):
(Boolean, HOption[String], HOption[String], HOption[String], SparkRDDWriteClient[_], HoodieTableConfig) = {
assert(optParams.get("path").exists(!StringUtils.isNullOrEmpty(_)), "'path' must be set")
val path = optParams("path")
val basePath = new Path(path)
val spark = sqlContext.sparkSession
val sparkContext = sqlContext.sparkContext
val fs = basePath.getFileSystem(sparkContext.hadoopConfiguration)
tableExists = fs.exists(new Path(basePath, HoodieTableMetaClient.METAFOLDER_NAME))
var tableConfig = getHoodieTableConfig(sparkContext, path, mode, streamingWritesParamsOpt.map(_.hoodieTableConfigOpt).orElse(Option.apply(Option.empty)).get)
// get params w/o injecting default and validate
val paramsWithoutDefaults = HoodieWriterUtils.getParamsWithAlternatives(optParams)
val originKeyGeneratorClassName = HoodieWriterUtils.getOriginKeyGenerator(paramsWithoutDefaults)
val timestampKeyGeneratorConfigs = extractConfigsRelatedToTimestampBasedKeyGenerator(
originKeyGeneratorClassName, paramsWithoutDefaults)
// Validate datasource and tableconfig keygen are the same
validateKeyGeneratorConfig(originKeyGeneratorClassName, tableConfig)
validateTableConfig(sqlContext.sparkSession, optParams, tableConfig, mode == SaveMode.Overwrite)
asyncCompactionTriggerFnDefined = streamingWritesParamsOpt.map(_.asyncCompactionTriggerFn.isDefined).orElse(Some(false)).get
asyncClusteringTriggerFnDefined = streamingWritesParamsOpt.map(_.asyncClusteringTriggerFn.isDefined).orElse(Some(false)).get
// re-use table configs and inject defaults.
var (parameters, hoodieConfig) = mergeParamsAndGetHoodieConfig(optParams, tableConfig, mode, streamingWritesParamsOpt.isDefined)
val databaseName = hoodieConfig.getStringOrDefault(HoodieTableConfig.DATABASE_NAME, "")
val tblName = hoodieConfig.getStringOrThrow(HoodieWriteConfig.TBL_NAME,
s"'${HoodieWriteConfig.TBL_NAME.key}' must be set.").trim
val tableIdentifier = TableIdentifier(tblName, if (databaseName.isEmpty) None else Some(databaseName))
assert(!StringUtils.isNullOrEmpty(hoodieConfig.getString(HoodieWriteConfig.TBL_NAME)),
s"'${HoodieWriteConfig.TBL_NAME.key}' must be set.")
sparkContext.getConf.getOption("spark.serializer") match {
case Some(ser) if ser.equals("org.apache.spark.serializer.KryoSerializer") =>
case _ => throw new HoodieException("hoodie only support org.apache.spark.serializer.KryoSerializer as spark.serializer")
}
val tableType = HoodieTableType.valueOf(hoodieConfig.getString(TABLE_TYPE))
val operation = deduceOperation(hoodieConfig, paramsWithoutDefaults, sourceDf)
val preppedSparkSqlMergeInto = parameters.getOrElse(SPARK_SQL_MERGE_INTO_PREPPED_KEY, "false").toBoolean
val preppedSparkSqlWrites = parameters.getOrElse(SPARK_SQL_WRITES_PREPPED_KEY, "false").toBoolean
val preppedWriteOperation = canDoPreppedWrites(hoodieConfig, parameters, operation, sourceDf)
val jsc = new JavaSparkContext(sparkContext)
if (streamingWritesParamsOpt.map(_.asyncCompactionTriggerFn.isDefined).orElse(Some(false)).get) {
if (jsc.getConf.getOption(SparkConfigs.SPARK_SCHEDULER_ALLOCATION_FILE_KEY).isDefined) {
jsc.setLocalProperty("spark.scheduler.pool", SparkConfigs.SPARK_DATASOURCE_WRITER_POOL_NAME)
}
}
val keyGenerator = HoodieSparkKeyGeneratorFactory.createKeyGenerator(TypedProperties.copy(hoodieConfig.getProps))
val tableVersion = Integer.valueOf(getStringWithAltKeys(parameters, HoodieWriteConfig.WRITE_TABLE_VERSION))
if (mode == SaveMode.Ignore && tableExists) {
log.warn(s"hoodie table at $basePath already exists. Ignoring & not performing actual writes.")
(false, common.util.Option.empty(), common.util.Option.empty(), common.util.Option.empty(), hoodieWriteClient.orNull, tableConfig)
} else {
// Handle various save modes
handleSaveModes(sqlContext.sparkSession, mode, basePath, tableConfig, tblName, operation, fs)
val partitionColumns = SparkKeyGenUtils.getPartitionColumns(keyGenerator, toProperties(parameters), false)
val partitionColumnsForKeyGenerator = SparkKeyGenUtils.getPartitionColumnsForKeyGenerator(toProperties(parameters), HoodieTableVersion.fromVersionCode(tableVersion))
val timelineTimeZone = HoodieTimelineTimeZone.valueOf(hoodieConfig.getStringOrDefault(HoodieTableConfig.TIMELINE_TIMEZONE))
val tableMetaClient = if (tableExists) {
HoodieInstantTimeGenerator.setCommitTimeZone(timelineTimeZone)
HoodieTableMetaClient.builder
.setConf(HadoopFSUtils.getStorageConfWithCopy(sparkContext.hadoopConfiguration))
.setBasePath(path)
.build()
} else {
val baseFileFormat = hoodieConfig.getStringOrDefault(HoodieTableConfig.BASE_FILE_FORMAT)
val archiveLogFolder = hoodieConfig.getStringOrDefault(HoodieTableConfig.TIMELINE_HISTORY_PATH)
val populateMetaFields = hoodieConfig.getBooleanOrDefault(HoodieTableConfig.POPULATE_META_FIELDS)
val useBaseFormatMetaFile = hoodieConfig.getBooleanOrDefault(HoodieTableConfig.PARTITION_METAFILE_USE_BASE_FORMAT);
val payloadClass = hoodieConfig.getString(DataSourceWriteOptions.PAYLOAD_CLASS_NAME)
val recordMergeStrategyId = hoodieConfig.getString(DataSourceWriteOptions.RECORD_MERGE_STRATEGY_ID)
val keyGenProp =
if (StringUtils.nonEmpty(hoodieConfig.getString(DataSourceWriteOptions.KEYGENERATOR_CLASS_NAME)))
hoodieConfig.getString(DataSourceWriteOptions.KEYGENERATOR_CLASS_NAME)
else KeyGeneratorType.getKeyGeneratorClassName(hoodieConfig)
HoodieTableMetaClient.newTableBuilder()
.setTableType(tableType)
.setTableVersion(tableVersion)
.setDatabaseName(databaseName)
.setTableName(tblName)
.setBaseFileFormat(baseFileFormat)
.setArchiveLogFolder(archiveLogFolder)
// we can't fetch preCombine field from hoodieConfig object, since it falls back to "ts" as default value,
// but we are interested in what user has set, hence fetching from optParams.
.setPreCombineField(optParams.getOrElse(PRECOMBINE_FIELD.key(), null))
.setPartitionFields(partitionColumnsForKeyGenerator)
.setPopulateMetaFields(populateMetaFields)
.setRecordKeyFields(hoodieConfig.getString(RECORDKEY_FIELD))
.setSecondaryKeyFields(hoodieConfig.getString(SECONDARYKEY_COLUMN_NAME))
.setCDCEnabled(hoodieConfig.getBooleanOrDefault(HoodieTableConfig.CDC_ENABLED))
.setCDCSupplementalLoggingMode(hoodieConfig.getStringOrDefault(HoodieTableConfig.CDC_SUPPLEMENTAL_LOGGING_MODE))
.setKeyGeneratorClassProp(keyGenProp)
.set(timestampKeyGeneratorConfigs.asJava.asInstanceOf[java.util.Map[String, Object]])
.setHiveStylePartitioningEnable(hoodieConfig.getBoolean(HIVE_STYLE_PARTITIONING))
.setUrlEncodePartitioning(hoodieConfig.getBoolean(URL_ENCODE_PARTITIONING))
.setPartitionMetafileUseBaseFormat(useBaseFormatMetaFile)
.setShouldDropPartitionColumns(hoodieConfig.getBooleanOrDefault(HoodieTableConfig.DROP_PARTITION_COLUMNS))
.setCommitTimezone(timelineTimeZone)
.setPayloadClassName(payloadClass)
.setRecordMergeStrategyId(recordMergeStrategyId)
.setRecordMergeMode(RecordMergeMode.getValue(hoodieConfig.getString(HoodieWriteConfig.RECORD_MERGE_MODE)))
.setMultipleBaseFileFormatsEnabled(hoodieConfig.getBoolean(HoodieTableConfig.MULTIPLE_BASE_FILE_FORMATS_ENABLE))
.initTable(HadoopFSUtils.getStorageConfWithCopy(sparkContext.hadoopConfiguration), path)
}
// take care of partition level bucket index which is simple bucket index.
// for BUCKET_RESCALE action will set related configs in call command, so skip here.
if (hoodieConfig.getStringOrDefault(HoodieIndexConfig.INDEX_TYPE, "") == HoodieIndex.IndexType.BUCKET.name
&& PartitionBucketIndexUtils.isPartitionSimpleBucketIndex(tableMetaClient.getStorageConf, basePath.toString)
&& hoodieConfig.getStringOrDefault(HoodieInternalConfig.BULKINSERT_OVERWRITE_OPERATION_TYPE, "") != WriteOperationType.BUCKET_RESCALE.value()) {
val latestHashingConfig = PartitionBucketIndexHashingConfig.loadingLatestHashingConfig(tableMetaClient)
hoodieConfig.setValue(HoodieIndexConfig.BUCKET_INDEX_PARTITION_EXPRESSIONS.key, latestHashingConfig.getExpressions)
hoodieConfig.setValue(HoodieIndexConfig.BUCKET_INDEX_NUM_BUCKETS.key, latestHashingConfig.getDefaultBucketNumber.toString)
hoodieConfig.setValue(HoodieIndexConfig.BUCKET_INDEX_PARTITION_RULE_TYPE.key, latestHashingConfig.getRule)
parameters = parameters ++ Map(HoodieIndexConfig.BUCKET_INDEX_PARTITION_EXPRESSIONS.key -> latestHashingConfig.getExpressions)
parameters = parameters ++ Map(HoodieIndexConfig.BUCKET_INDEX_NUM_BUCKETS.key -> latestHashingConfig.getDefaultBucketNumber.toString)
parameters = parameters ++ Map(HoodieIndexConfig.BUCKET_INDEX_PARTITION_RULE_TYPE.key -> latestHashingConfig.getRule)
}
var instantTime: String = null
tableConfig = tableMetaClient.getTableConfig
val commitActionType = CommitUtils.getCommitActionType(operation, tableConfig.getTableType)
// Register Avro classes ([[Schema]], [[GenericData]]) w/ Kryo
sparkContext.getConf.registerKryoClasses(
Array(classOf[GenericData],
classOf[Schema]))
val shouldReconcileSchema = parameters(DataSourceWriteOptions.RECONCILE_SCHEMA.key()).toBoolean
val latestTableSchemaOpt = getLatestTableSchema(spark, tableIdentifier, tableMetaClient)
val df = if (preppedWriteOperation || preppedSparkSqlWrites || preppedSparkSqlMergeInto || sourceDf.isStreaming) {
sourceDf
} else {
sourceDf.drop(HoodieRecord.HOODIE_META_COLUMNS.asScala.toSeq: _*)
}
// NOTE: We need to make sure that upon conversion of the schemas b/w Catalyst's [[StructType]] and
// Avro's [[Schema]] we're preserving corresponding "record-name" and "record-namespace" that
// play crucial role in establishing compatibility b/w schemas
val (avroRecordName, avroRecordNamespace) = latestTableSchemaOpt.map(s => (s.getName, s.getNamespace))
.getOrElse(getAvroRecordNameAndNamespace(tblName))
val sourceSchema = convertStructTypeToAvroSchema(df.schema, avroRecordName, avroRecordNamespace)
val internalSchemaOpt = HoodieSchemaUtils.getLatestTableInternalSchema(hoodieConfig, tableMetaClient).orElse {
// In case we need to reconcile the schema and schema evolution is enabled,
// we will force-apply schema evolution to the writer's schema
if (shouldReconcileSchema && hoodieConfig.getBooleanOrDefault(DataSourceReadOptions.SCHEMA_EVOLUTION_ENABLED)) {
val allowOperationMetaDataField = parameters.getOrElse(HoodieWriteConfig.ALLOW_OPERATION_METADATA_FIELD.key(), "false").toBoolean
Some(AvroInternalSchemaConverter.convert(HoodieAvroUtils.addMetadataFields(latestTableSchemaOpt.getOrElse(sourceSchema), allowOperationMetaDataField)))
} else {
None
}
}
val (writeResult: HoodieWriteResult, writeClient: SparkRDDWriteClient[_]) =
operation match {
case WriteOperationType.DELETE | WriteOperationType.DELETE_PREPPED =>
mayBeValidateParamsForAutoGenerationOfRecordKeys(parameters, hoodieConfig)
val genericRecords = HoodieSparkUtils.createRdd(df, avroRecordName, avroRecordNamespace)
// Convert to RDD[HoodieKey]
val hoodieKeysAndLocationsToDelete = genericRecords.mapPartitions(it => {
val keyGenerator: Option[BaseKeyGenerator] = if (preppedSparkSqlWrites || preppedWriteOperation) {
None
} else {
Some(HoodieSparkKeyGeneratorFactory.createKeyGenerator(TypedProperties.copy(hoodieConfig.getProps))
.asInstanceOf[BaseKeyGenerator])
}
it.map { avroRec =>
HoodieCreateRecordUtils.getHoodieKeyAndMaybeLocationFromAvroRecord(keyGenerator, avroRec, preppedSparkSqlWrites || preppedWriteOperation, preppedSparkSqlWrites || preppedSparkSqlMergeInto || preppedWriteOperation)
}
}).toJavaRDD()
if (!tableExists) {
throw new HoodieException(s"hoodie table at $basePath does not exist")
}
// Create a HoodieWriteClient & issue the delete.
val internalSchemaOpt = HoodieSchemaUtils.getLatestTableInternalSchema(hoodieConfig, tableMetaClient)
val client = hoodieWriteClient.getOrElse(DataSourceUtils.createHoodieClient(jsc,
null, path, tblName,
(addSchemaEvolutionParameters(parameters, internalSchemaOpt) - HoodieWriteConfig.AUTO_COMMIT_ENABLE.key).asJava))
.asInstanceOf[SparkRDDWriteClient[_]]
if (isAsyncCompactionEnabled(client, tableConfig, parameters, jsc.hadoopConfiguration())) {
streamingWritesParamsOpt.map(_.asyncCompactionTriggerFn.get.apply(client))
}
if (isAsyncClusteringEnabled(client, parameters)) {
streamingWritesParamsOpt.map(_.asyncClusteringTriggerFn.get.apply(client))
}
instantTime = client.createNewInstantTime()
// Issue deletes
client.startCommitWithTime(instantTime, commitActionType)
val writeStatuses = DataSourceUtils.doDeleteOperation(client, hoodieKeysAndLocationsToDelete, instantTime, preppedSparkSqlWrites || preppedWriteOperation)
(writeStatuses, client)
case WriteOperationType.DELETE_PARTITION =>
if (!tableExists) {
throw new HoodieException(s"hoodie table at $basePath does not exist")
}
val keyGenerator = HoodieSparkKeyGeneratorFactory.createKeyGenerator(TypedProperties.copy(hoodieConfig.getProps))
val tableMetaClient = HoodieTableMetaClient.builder
.setConf(HadoopFSUtils.getStorageConfWithCopy(sparkContext.hadoopConfiguration))
.setBasePath(basePath.toString).build()
// Get list of partitions to delete
val partitionsToDelete = if (parameters.contains(DataSourceWriteOptions.PARTITIONS_TO_DELETE.key())) {
val partitionColsToDelete = parameters(DataSourceWriteOptions.PARTITIONS_TO_DELETE.key()).split(",")
java.util.Arrays.asList(resolvePartitionWildcards(java.util.Arrays.asList(partitionColsToDelete: _*).asScala.toList, jsc,
tableMetaClient.getStorage, hoodieConfig, basePath.toString): _*)
} else {
val genericRecords = HoodieSparkUtils.createRdd(df, avroRecordName, avroRecordNamespace)
genericRecords.map(gr => keyGenerator.getKey(gr).getPartitionPath).toJavaRDD().distinct().collect()
}
// Issue the delete.
val schemaStr = new TableSchemaResolver(tableMetaClient).getTableAvroSchema(false).toString
val client = hoodieWriteClient.getOrElse(DataSourceUtils.createHoodieClient(jsc,
schemaStr, path, tblName,
(parameters - HoodieWriteConfig.AUTO_COMMIT_ENABLE.key).asJava))
.asInstanceOf[SparkRDDWriteClient[_]]
// Issue delete partitions
instantTime = client.createNewInstantTime()
client.startCommitWithTime(instantTime, commitActionType)
val writeStatuses = DataSourceUtils.doDeletePartitionsOperation(client, partitionsToDelete, instantTime)
(writeStatuses, client)
// Here all other (than DELETE, DELETE_PARTITION) write operations are handled
case _ =>
// NOTE: Target writer's schema is deduced based on
// - Source's schema
// - Existing table's schema (including its Hudi's [[InternalSchema]] representation)
val writerSchema = HoodieSchemaUtils.deduceWriterSchema(sourceSchema, latestTableSchemaOpt, internalSchemaOpt, parameters)
validateSchemaForHoodieIsDeleted(writerSchema)
mayBeValidateParamsForAutoGenerationOfRecordKeys(parameters, hoodieConfig)
// Check whether partition columns should be persisted w/in the data-files, or should
// be instead omitted from them and simply encoded into the partition path (which is Spark's
// behavior by default)
// TODO move partition columns handling down into the handlers
val shouldDropPartitionColumns = hoodieConfig.getBoolean(DataSourceWriteOptions.DROP_PARTITION_COLUMNS)
val dataFileSchema = if (shouldDropPartitionColumns) {
val truncatedSchema = generateSchemaWithoutPartitionColumns(partitionColumns, writerSchema)
// NOTE: We have to register this schema w/ Kryo to make sure it's able to apply an optimization
// allowing it to avoid the need to ser/de the whole schema along _every_ Avro record
registerAvroSchemasWithKryo(sparkContext, truncatedSchema)
truncatedSchema
} else {
writerSchema
}
// Remove meta columns from writerSchema if isPrepped is true.
val processedDataSchema = if (preppedSparkSqlWrites || preppedSparkSqlMergeInto || preppedWriteOperation) {
HoodieAvroUtils.removeMetadataFields(dataFileSchema)
} else {
dataFileSchema
}
// Create a HoodieWriteClient & issue the write.
val client = hoodieWriteClient.getOrElse {
val finalOpts = addSchemaEvolutionParameters(parameters, internalSchemaOpt, Some(writerSchema)) - HoodieWriteConfig.AUTO_COMMIT_ENABLE.key
// TODO(HUDI-4772) proper writer-schema has to be specified here
DataSourceUtils.createHoodieClient(jsc, processedDataSchema.toString, path, tblName, finalOpts.asJava)
}
if (isAsyncCompactionEnabled(client, tableConfig, parameters, jsc.hadoopConfiguration())) {
streamingWritesParamsOpt.map(_.asyncCompactionTriggerFn.get.apply(client))
}
if (isAsyncClusteringEnabled(client, parameters)) {
streamingWritesParamsOpt.map(_.asyncClusteringTriggerFn.get.apply(client))
}
// Short-circuit if bulk_insert via row is enabled.
// scalastyle:off
if (hoodieConfig.getBoolean(ENABLE_ROW_WRITER) && operation == WriteOperationType.BULK_INSERT) {
return bulkInsertAsRow(client, parameters, hoodieConfig, df, mode, tblName, basePath, writerSchema, tableConfig)
}
// scalastyle:on
val writeConfig = client.getConfig
if (writeConfig.getRecordMerger.getRecordType == HoodieRecordType.SPARK && tableType == MERGE_ON_READ && writeConfig.getLogDataBlockFormat.orElse(HoodieLogBlockType.AVRO_DATA_BLOCK) != HoodieLogBlockType.PARQUET_DATA_BLOCK) {
throw new UnsupportedOperationException(s"${writeConfig.getRecordMerger.getClass.getName} only support parquet log.")
}
instantTime = client.createNewInstantTime()
// Convert to RDD[HoodieRecord]
val hoodieRecords = Try(HoodieCreateRecordUtils.createHoodieRecordRdd(
HoodieCreateRecordUtils.createHoodieRecordRddArgs(df, writeConfig, parameters, avroRecordName,
avroRecordNamespace, writerSchema, processedDataSchema, operation, instantTime, preppedSparkSqlWrites,
preppedSparkSqlMergeInto, preppedWriteOperation))) match {
case Success(recs) => recs
case Failure(e) => throw new HoodieRecordCreationException("Failed to create Hoodie Spark Record", e)
}
// Remove duplicates from incoming records based on existing keys from storage.
val dedupedHoodieRecords = handleInsertDuplicates(hoodieRecords, hoodieConfig, operation, jsc, parameters)
client.startCommitWithTime(instantTime, commitActionType)
try {
val writeResult = DataSourceUtils.doWriteOperation(client, dedupedHoodieRecords, instantTime, operation,
preppedSparkSqlWrites || preppedWriteOperation)
(writeResult, client)
} catch {
case e: HoodieException =>
// close the write client in all cases
handleWriteClientClosure(client, tableConfig, parameters, jsc.hadoopConfiguration())
throw e
}
}
// Check for errors and commit the write.
try {
val (writeSuccessful, compactionInstant, clusteringInstant) =
commitAndPerformPostOperations(sqlContext.sparkSession, df.schema,
writeResult, parameters, writeClient, tableConfig, jsc,
TableInstantInfo(basePath, instantTime, commitActionType, operation), streamingWritesParamsOpt.map(_.extraPreCommitFn)
.orElse(Option.apply(Option.empty)).get)
(writeSuccessful, common.util.Option.ofNullable(instantTime), compactionInstant, clusteringInstant, writeClient, tableConfig)
} finally {
handleWriteClientClosure(writeClient, tableConfig, parameters, jsc.hadoopConfiguration())
}
}
}