def apply()

in hudi-spark-datasource/hudi-spark3-common/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodieSpark3Analysis.scala [59:229]


  def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperatorsUp {
    case TimeTravelRelation(ResolvesToHudiTable(table), timestamp, version) =>
      if (timestamp.isEmpty && version.nonEmpty) {
        throw new AnalysisException("Version expression is not supported for time travel")
      }

      val pathOption = table.storage.locationUri.map("path" -> CatalogUtils.URIToString(_))
      val dataSource =
        DataSource(
          spark,
          userSpecifiedSchema = if (table.schema.isEmpty) None else Some(table.schema),
          partitionColumns = table.partitionColumnNames,
          bucketSpec = table.bucketSpec,
          className = table.provider.get,
          options = table.storage.properties ++ pathOption ++ Map(
            DataSourceReadOptions.TIME_TRAVEL_AS_OF_INSTANT.key -> timestamp.get.toString()),
          catalogTable = Some(table))

      val relation = dataSource.resolveRelation(checkFilesExist = false)

      LogicalRelation(relation, table)

    case HoodieQuery(args) =>
      val (tableName, opts) = HoodieQuery.parseOptions(args)

      val tableId = spark.sessionState.sqlParser.parseTableIdentifier(tableName)
      val catalogTable = spark.sessionState.catalog.getTableMetadata(tableId)

      val hoodieDataSource = new DefaultSource
      val relation = hoodieDataSource.createRelation(spark.sqlContext, opts ++ Map("path" ->
        catalogTable.location.toString))

      LogicalRelation(relation, catalogTable)

    case HoodieTableChanges(args) =>
      val (tablePath, opts) = HoodieTableChangesOptionsParser.parseOptions(args, HoodieTableChanges.FUNC_NAME)
      val hoodieDataSource = new DefaultSource
      if (tablePath.contains(StoragePath.SEPARATOR)) {
        // the first param is table path
        val relation = hoodieDataSource.createRelation(spark.sqlContext, opts ++ Map("path" -> tablePath))
        LogicalRelation(relation)
      } else {
        // the first param is table identifier
        val tableId = spark.sessionState.sqlParser.parseTableIdentifier(tablePath)
        val catalogTable = spark.sessionState.catalog.getTableMetadata(tableId)
        val relation = hoodieDataSource.createRelation(spark.sqlContext, opts ++ Map("path" ->
          catalogTable.location.toString))
        LogicalRelation(relation, catalogTable)
      }
    case HoodieTimelineTableValuedFunction(args) =>
      val (tablePath, opts) = HoodieTimelineTableValuedFunctionOptionsParser.parseOptions(args, HoodieTimelineTableValuedFunction.FUNC_NAME)
      val hoodieDataSource = new DefaultSource
      if (tablePath.contains(StoragePath.SEPARATOR)) {
        // the first param is table path
        val relation = hoodieDataSource.createRelation(spark.sqlContext, opts ++ Map("path" -> tablePath))
        LogicalRelation(relation)
      } else {
        // the first param is table identifier
        val tableId = spark.sessionState.sqlParser.parseTableIdentifier(tablePath)
        val catalogTable = spark.sessionState.catalog.getTableMetadata(tableId)
        val relation = hoodieDataSource.createRelation(spark.sqlContext, opts ++ Map("path" ->
          catalogTable.location.toString))
        LogicalRelation(relation, catalogTable)
      }
    case HoodieFileSystemViewTableValuedFunction(args) =>
      val (tablePath, opts) = HoodieFileSystemViewTableValuedFunctionOptionsParser.parseOptions(args, HoodieFileSystemViewTableValuedFunction.FUNC_NAME)
      val hoodieDataSource = new DefaultSource
      if (tablePath.contains(StoragePath.SEPARATOR)) {
        // the first param is table path
        val relation = hoodieDataSource.createRelation(spark.sqlContext, opts ++ Map("path" -> tablePath))
        LogicalRelation(relation)
      } else {
        // the first param is table identifier
        val tableId = spark.sessionState.sqlParser.parseTableIdentifier(tablePath)
        val catalogTable = spark.sessionState.catalog.getTableMetadata(tableId)
        val relation = hoodieDataSource.createRelation(spark.sqlContext, opts ++ Map("path" ->
          catalogTable.location.toString))
        LogicalRelation(relation, catalogTable)
      }
    case HoodieMetadataTableValuedFunction(args) =>
      val (tablePath, opts) = HoodieMetadataTableValuedFunction.parseOptions(args, HoodieMetadataTableValuedFunction.FUNC_NAME)
      val hoodieDataSource = new DefaultSource
      if (tablePath.contains(StoragePath.SEPARATOR)) {
        // the first param is table path
        val relation = hoodieDataSource.createRelation(spark.sqlContext, opts ++ Map("path" -> (tablePath + "/.hoodie/metadata")))
        LogicalRelation(relation)
      } else {
        // the first param is table identifier
        val tableId = spark.sessionState.sqlParser.parseTableIdentifier(tablePath)
        val catalogTable = spark.sessionState.catalog.getTableMetadata(tableId)
        val relation = hoodieDataSource.createRelation(spark.sqlContext, opts ++ Map("path" ->
          (catalogTable.location.toString + "/.hoodie/metadata")))
        LogicalRelation(relation, catalogTable)
      }
    case mO@MatchMergeIntoTable(targetTableO, sourceTableO, _)
      // START: custom Hudi change: don't want to go to the spark mit resolution so we resolve the source and target
      // if they haven't been
      if !mO.resolved =>
      lazy val analyzer = spark.sessionState.analyzer
      val targetTable = if (targetTableO.resolved) targetTableO else analyzer.execute(targetTableO)
      val sourceTable = if (sourceTableO.resolved) sourceTableO else analyzer.execute(sourceTableO)
      val m = mO.asInstanceOf[MergeIntoTable].copy(targetTable = targetTable, sourceTable = sourceTable)
      // END: custom Hudi change
      EliminateSubqueryAliases(targetTable) match {
        case r: NamedRelation if r.skipSchemaResolution =>
          // Do not resolve the expression if the target table accepts any schema.
          // This allows data sources to customize their own resolution logic using
          // custom resolution rules.
          m

        case _ =>
          val newMatchedActions = m.matchedActions.map {
            case DeleteAction(deleteCondition) =>
              val resolvedDeleteCondition = deleteCondition.map(
                resolveExpressionByPlanChildren(_, m))
              DeleteAction(resolvedDeleteCondition)
            case UpdateAction(updateCondition, assignments) =>
              val resolvedUpdateCondition = updateCondition.map(
                resolveExpressionByPlanChildren(_, m))
              UpdateAction(
                resolvedUpdateCondition,
                // The update value can access columns from both target and source tables.
                resolveAssignments(assignments, m, resolveValuesWithSourceOnly = false))
            case UpdateStarAction(updateCondition) =>
              // START: custom Hudi change: filter out meta fields
              val assignments = targetTable.output.filter(a => !isMetaField(a.name)).map { attr =>
                Assignment(attr, UnresolvedAttribute(Seq(attr.name)))
              }
              // END: custom Hudi change
              UpdateAction(
                updateCondition.map(resolveExpressionByPlanChildren(_, m)),
                // For UPDATE *, the value must from source table.
                resolveAssignments(assignments, m, resolveValuesWithSourceOnly = true))
            case o => o
          }
          val newNotMatchedActions = m.notMatchedActions.map {
            case InsertAction(insertCondition, assignments) =>
              // The insert action is used when not matched, so its condition and value can only
              // access columns from the source table.
              val resolvedInsertCondition = insertCondition.map(
                resolveExpressionByPlanChildren(_, Project(Nil, m.sourceTable)))
              InsertAction(
                resolvedInsertCondition,
                resolveAssignments(assignments, m, resolveValuesWithSourceOnly = true))
            case InsertStarAction(insertCondition) =>
              // The insert action is used when not matched, so its condition and value can only
              // access columns from the source table.
              val resolvedInsertCondition = insertCondition.map(
                resolveExpressionByPlanChildren(_, Project(Nil, m.sourceTable)))
              // START: custom Hudi change: filter out meta fields
              val assignments = targetTable.output.filter(a => !isMetaField(a.name)).map { attr =>
                Assignment(attr, UnresolvedAttribute(Seq(attr.name)))
              }
              // END: custom Hudi change
              InsertAction(
                resolvedInsertCondition,
                resolveAssignments(assignments, m, resolveValuesWithSourceOnly = true))
            case o => o
          }
          val resolvedMergeCondition = resolveExpressionByPlanChildren(m.mergeCondition, m)
          m.copy(mergeCondition = resolvedMergeCondition,
            matchedActions = newMatchedActions,
            notMatchedActions = newNotMatchedActions)
      }

    case cmd: CreateIndex if cmd.table.resolved && cmd.columns.exists(_._1.isInstanceOf[UnresolvedFieldName]) =>
      cmd.copy(columns = cmd.columns.map {
        case (u: UnresolvedFieldName, prop) => resolveFieldNames(cmd.table, u.name, u) -> prop
        case other => other
      })
  }