in hudi-spark-datasource/hudi-spark3-common/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodieSpark3Analysis.scala [59:229]
def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperatorsUp {
case TimeTravelRelation(ResolvesToHudiTable(table), timestamp, version) =>
if (timestamp.isEmpty && version.nonEmpty) {
throw new AnalysisException("Version expression is not supported for time travel")
}
val pathOption = table.storage.locationUri.map("path" -> CatalogUtils.URIToString(_))
val dataSource =
DataSource(
spark,
userSpecifiedSchema = if (table.schema.isEmpty) None else Some(table.schema),
partitionColumns = table.partitionColumnNames,
bucketSpec = table.bucketSpec,
className = table.provider.get,
options = table.storage.properties ++ pathOption ++ Map(
DataSourceReadOptions.TIME_TRAVEL_AS_OF_INSTANT.key -> timestamp.get.toString()),
catalogTable = Some(table))
val relation = dataSource.resolveRelation(checkFilesExist = false)
LogicalRelation(relation, table)
case HoodieQuery(args) =>
val (tableName, opts) = HoodieQuery.parseOptions(args)
val tableId = spark.sessionState.sqlParser.parseTableIdentifier(tableName)
val catalogTable = spark.sessionState.catalog.getTableMetadata(tableId)
val hoodieDataSource = new DefaultSource
val relation = hoodieDataSource.createRelation(spark.sqlContext, opts ++ Map("path" ->
catalogTable.location.toString))
LogicalRelation(relation, catalogTable)
case HoodieTableChanges(args) =>
val (tablePath, opts) = HoodieTableChangesOptionsParser.parseOptions(args, HoodieTableChanges.FUNC_NAME)
val hoodieDataSource = new DefaultSource
if (tablePath.contains(StoragePath.SEPARATOR)) {
// the first param is table path
val relation = hoodieDataSource.createRelation(spark.sqlContext, opts ++ Map("path" -> tablePath))
LogicalRelation(relation)
} else {
// the first param is table identifier
val tableId = spark.sessionState.sqlParser.parseTableIdentifier(tablePath)
val catalogTable = spark.sessionState.catalog.getTableMetadata(tableId)
val relation = hoodieDataSource.createRelation(spark.sqlContext, opts ++ Map("path" ->
catalogTable.location.toString))
LogicalRelation(relation, catalogTable)
}
case HoodieTimelineTableValuedFunction(args) =>
val (tablePath, opts) = HoodieTimelineTableValuedFunctionOptionsParser.parseOptions(args, HoodieTimelineTableValuedFunction.FUNC_NAME)
val hoodieDataSource = new DefaultSource
if (tablePath.contains(StoragePath.SEPARATOR)) {
// the first param is table path
val relation = hoodieDataSource.createRelation(spark.sqlContext, opts ++ Map("path" -> tablePath))
LogicalRelation(relation)
} else {
// the first param is table identifier
val tableId = spark.sessionState.sqlParser.parseTableIdentifier(tablePath)
val catalogTable = spark.sessionState.catalog.getTableMetadata(tableId)
val relation = hoodieDataSource.createRelation(spark.sqlContext, opts ++ Map("path" ->
catalogTable.location.toString))
LogicalRelation(relation, catalogTable)
}
case HoodieFileSystemViewTableValuedFunction(args) =>
val (tablePath, opts) = HoodieFileSystemViewTableValuedFunctionOptionsParser.parseOptions(args, HoodieFileSystemViewTableValuedFunction.FUNC_NAME)
val hoodieDataSource = new DefaultSource
if (tablePath.contains(StoragePath.SEPARATOR)) {
// the first param is table path
val relation = hoodieDataSource.createRelation(spark.sqlContext, opts ++ Map("path" -> tablePath))
LogicalRelation(relation)
} else {
// the first param is table identifier
val tableId = spark.sessionState.sqlParser.parseTableIdentifier(tablePath)
val catalogTable = spark.sessionState.catalog.getTableMetadata(tableId)
val relation = hoodieDataSource.createRelation(spark.sqlContext, opts ++ Map("path" ->
catalogTable.location.toString))
LogicalRelation(relation, catalogTable)
}
case HoodieMetadataTableValuedFunction(args) =>
val (tablePath, opts) = HoodieMetadataTableValuedFunction.parseOptions(args, HoodieMetadataTableValuedFunction.FUNC_NAME)
val hoodieDataSource = new DefaultSource
if (tablePath.contains(StoragePath.SEPARATOR)) {
// the first param is table path
val relation = hoodieDataSource.createRelation(spark.sqlContext, opts ++ Map("path" -> (tablePath + "/.hoodie/metadata")))
LogicalRelation(relation)
} else {
// the first param is table identifier
val tableId = spark.sessionState.sqlParser.parseTableIdentifier(tablePath)
val catalogTable = spark.sessionState.catalog.getTableMetadata(tableId)
val relation = hoodieDataSource.createRelation(spark.sqlContext, opts ++ Map("path" ->
(catalogTable.location.toString + "/.hoodie/metadata")))
LogicalRelation(relation, catalogTable)
}
case mO@MatchMergeIntoTable(targetTableO, sourceTableO, _)
// START: custom Hudi change: don't want to go to the spark mit resolution so we resolve the source and target
// if they haven't been
if !mO.resolved =>
lazy val analyzer = spark.sessionState.analyzer
val targetTable = if (targetTableO.resolved) targetTableO else analyzer.execute(targetTableO)
val sourceTable = if (sourceTableO.resolved) sourceTableO else analyzer.execute(sourceTableO)
val m = mO.asInstanceOf[MergeIntoTable].copy(targetTable = targetTable, sourceTable = sourceTable)
// END: custom Hudi change
EliminateSubqueryAliases(targetTable) match {
case r: NamedRelation if r.skipSchemaResolution =>
// Do not resolve the expression if the target table accepts any schema.
// This allows data sources to customize their own resolution logic using
// custom resolution rules.
m
case _ =>
val newMatchedActions = m.matchedActions.map {
case DeleteAction(deleteCondition) =>
val resolvedDeleteCondition = deleteCondition.map(
resolveExpressionByPlanChildren(_, m))
DeleteAction(resolvedDeleteCondition)
case UpdateAction(updateCondition, assignments) =>
val resolvedUpdateCondition = updateCondition.map(
resolveExpressionByPlanChildren(_, m))
UpdateAction(
resolvedUpdateCondition,
// The update value can access columns from both target and source tables.
resolveAssignments(assignments, m, resolveValuesWithSourceOnly = false))
case UpdateStarAction(updateCondition) =>
// START: custom Hudi change: filter out meta fields
val assignments = targetTable.output.filter(a => !isMetaField(a.name)).map { attr =>
Assignment(attr, UnresolvedAttribute(Seq(attr.name)))
}
// END: custom Hudi change
UpdateAction(
updateCondition.map(resolveExpressionByPlanChildren(_, m)),
// For UPDATE *, the value must from source table.
resolveAssignments(assignments, m, resolveValuesWithSourceOnly = true))
case o => o
}
val newNotMatchedActions = m.notMatchedActions.map {
case InsertAction(insertCondition, assignments) =>
// The insert action is used when not matched, so its condition and value can only
// access columns from the source table.
val resolvedInsertCondition = insertCondition.map(
resolveExpressionByPlanChildren(_, Project(Nil, m.sourceTable)))
InsertAction(
resolvedInsertCondition,
resolveAssignments(assignments, m, resolveValuesWithSourceOnly = true))
case InsertStarAction(insertCondition) =>
// The insert action is used when not matched, so its condition and value can only
// access columns from the source table.
val resolvedInsertCondition = insertCondition.map(
resolveExpressionByPlanChildren(_, Project(Nil, m.sourceTable)))
// START: custom Hudi change: filter out meta fields
val assignments = targetTable.output.filter(a => !isMetaField(a.name)).map { attr =>
Assignment(attr, UnresolvedAttribute(Seq(attr.name)))
}
// END: custom Hudi change
InsertAction(
resolvedInsertCondition,
resolveAssignments(assignments, m, resolveValuesWithSourceOnly = true))
case o => o
}
val resolvedMergeCondition = resolveExpressionByPlanChildren(m.mergeCondition, m)
m.copy(mergeCondition = resolvedMergeCondition,
matchedActions = newMatchedActions,
notMatchedActions = newNotMatchedActions)
}
case cmd: CreateIndex if cmd.table.resolved && cmd.columns.exists(_._1.isInstanceOf[UnresolvedFieldName]) =>
cmd.copy(columns = cmd.columns.map {
case (u: UnresolvedFieldName, prop) => resolveFieldNames(cmd.table, u.name, u) -> prop
case other => other
})
}