in hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DefaultSource.scala [276:418]
def createRelation(sqlContext: SQLContext,
metaClient: HoodieTableMetaClient,
schema: StructType,
globPaths: Seq[StoragePath],
parameters: Map[String, String]): BaseRelation = {
val tableType = metaClient.getTableType
val isBootstrappedTable = metaClient.getTableConfig.getBootstrapBasePath.isPresent
val queryType = SparkConfigUtils.getStringWithAltKeys(parameters, QUERY_TYPE)
val isCdcQuery = queryType == QUERY_TYPE_INCREMENTAL_OPT_VAL &&
parameters.get(INCREMENTAL_FORMAT.key).contains(INCREMENTAL_FORMAT_CDC_VAL)
val createTimeLineRln = parameters.get(DataSourceReadOptions.CREATE_TIMELINE_RELATION.key())
val createFSRln = parameters.get(DataSourceReadOptions.CREATE_FILESYSTEM_RELATION.key())
if (createTimeLineRln.isDefined) {
new TimelineRelation(sqlContext, parameters, metaClient)
} else if (createFSRln.isDefined) {
new FileSystemRelation(sqlContext, parameters, metaClient)
} else {
log.info(s"Is bootstrapped table => $isBootstrappedTable, tableType is: $tableType, queryType is: $queryType")
// NOTE: In cases when Hive Metastore is used as catalog and the table is partitioned, schema in the HMS might contain
// Hive-specific partitioning columns created specifically for HMS to handle partitioning appropriately. In that
// case we opt in to not be providing catalog's schema, and instead force Hudi relations to fetch the schema
// from the table itself
val userSchema = if (isUsingHiveCatalog(sqlContext.sparkSession)) {
None
} else {
Option(schema)
}
val useNewParquetFileFormat = parameters.getOrElse(HoodieReaderConfig.FILE_GROUP_READER_ENABLED.key(),
HoodieReaderConfig.FILE_GROUP_READER_ENABLED.defaultValue().toString).toBoolean &&
!metaClient.isMetadataTable && (globPaths == null || globPaths.isEmpty)
if (metaClient.getCommitsTimeline.filterCompletedInstants.countInstants() == 0) {
new EmptyRelation(sqlContext, resolveSchema(metaClient, parameters, Some(schema)))
} else if (isCdcQuery) {
if (useNewParquetFileFormat) {
if (tableType == COPY_ON_WRITE) {
new HoodieCopyOnWriteCDCHadoopFsRelationFactory(
sqlContext, metaClient, parameters, userSchema, isBootstrap = false).build()
} else {
new HoodieMergeOnReadCDCHadoopFsRelationFactory(
sqlContext, metaClient, parameters, userSchema, isBootstrap = false).build()
}
} else {
CDCRelation.getCDCRelation(sqlContext, metaClient, parameters)
}
} else {
(tableType, queryType, isBootstrappedTable) match {
case (COPY_ON_WRITE, QUERY_TYPE_SNAPSHOT_OPT_VAL, false) |
(COPY_ON_WRITE, QUERY_TYPE_READ_OPTIMIZED_OPT_VAL, false) |
(MERGE_ON_READ, QUERY_TYPE_READ_OPTIMIZED_OPT_VAL, false) =>
if (useNewParquetFileFormat) {
new HoodieCopyOnWriteSnapshotHadoopFsRelationFactory(
sqlContext, metaClient, parameters, userSchema, isBootstrap = false).build()
} else {
resolveBaseFileOnlyRelation(sqlContext, globPaths, userSchema, metaClient, parameters)
}
case (COPY_ON_WRITE, QUERY_TYPE_INCREMENTAL_OPT_VAL, _) =>
if (SparkConfigUtils.containsConfigProperty(parameters, INCREMENTAL_READ_TABLE_VERSION)) {
val writeTableVersion = Integer.parseInt(parameters(INCREMENTAL_READ_TABLE_VERSION.key))
if (writeTableVersion >= HoodieTableVersion.EIGHT.versionCode()) {
if (useNewParquetFileFormat) {
new HoodieCopyOnWriteIncrementalHadoopFsRelationFactory(
sqlContext, metaClient, parameters, userSchema, isBootstrappedTable).build()
} else {
new IncrementalRelationV2(sqlContext, parameters, userSchema, metaClient, RangeType.CLOSED_CLOSED)
}
} else {
new IncrementalRelationV1(sqlContext, parameters, userSchema, metaClient)
}
} else {
if (metaClient.getTableConfig.getTableVersion.versionCode() >= HoodieTableVersion.EIGHT.versionCode()) {
if (useNewParquetFileFormat) {
new HoodieCopyOnWriteIncrementalHadoopFsRelationFactory(
sqlContext, metaClient, parameters, userSchema, isBootstrappedTable).build()
} else {
new IncrementalRelationV2(sqlContext, parameters, userSchema, metaClient, RangeType.CLOSED_CLOSED)
}
} else {
new IncrementalRelationV1(sqlContext, parameters, userSchema, metaClient)
}
}
case (MERGE_ON_READ, QUERY_TYPE_SNAPSHOT_OPT_VAL, false) =>
if (useNewParquetFileFormat) {
new HoodieMergeOnReadSnapshotHadoopFsRelationFactory(
sqlContext, metaClient, parameters, userSchema, isBootstrap = false).build()
} else {
new MergeOnReadSnapshotRelation(sqlContext, parameters, metaClient, globPaths, userSchema)
}
case (MERGE_ON_READ, QUERY_TYPE_SNAPSHOT_OPT_VAL, true) =>
if (useNewParquetFileFormat) {
new HoodieMergeOnReadSnapshotHadoopFsRelationFactory(
sqlContext, metaClient, parameters, userSchema, isBootstrap = true).build()
} else {
HoodieBootstrapMORRelation(sqlContext, userSchema, globPaths, metaClient, parameters)
}
case (MERGE_ON_READ, QUERY_TYPE_INCREMENTAL_OPT_VAL, _) =>
if (SparkConfigUtils.containsConfigProperty(parameters, INCREMENTAL_READ_TABLE_VERSION)) {
val writeTableVersion = Integer.parseInt(parameters(INCREMENTAL_READ_TABLE_VERSION.key))
if (writeTableVersion >= HoodieTableVersion.EIGHT.versionCode()) {
if (useNewParquetFileFormat) {
new HoodieMergeOnReadIncrementalHadoopFsRelationFactory(
sqlContext, metaClient, parameters, userSchema, isBootstrappedTable).build()
} else {
MergeOnReadIncrementalRelationV2(sqlContext, parameters, metaClient, userSchema)
}
} else {
MergeOnReadIncrementalRelationV1(sqlContext, parameters, metaClient, userSchema)
}
} else {
if (metaClient.getTableConfig.getTableVersion.versionCode() >= HoodieTableVersion.EIGHT.versionCode()) {
if (useNewParquetFileFormat) {
new HoodieMergeOnReadIncrementalHadoopFsRelationFactory(
sqlContext, metaClient, parameters, userSchema, isBootstrappedTable).build()
} else {
MergeOnReadIncrementalRelationV2(sqlContext, parameters, metaClient, userSchema)
}
} else {
MergeOnReadIncrementalRelationV1(sqlContext, parameters, metaClient, userSchema)
}
}
case (_, _, true) =>
if (useNewParquetFileFormat) {
new HoodieCopyOnWriteSnapshotHadoopFsRelationFactory(
sqlContext, metaClient, parameters, userSchema, isBootstrap = true).build()
} else {
resolveHoodieBootstrapRelation(sqlContext, globPaths, userSchema, metaClient, parameters)
}
case (_, _, _) =>
throw new HoodieException(s"Invalid query type : $queryType for tableType: $tableType," +
s"isBootstrappedTable: $isBootstrappedTable ")
}
}
}
}