def apply()

in sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala [1463:1740]


    def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperatorsUp {
      // Don't wait other rules to resolve the child plans of `InsertIntoStatement` as we need
      // to resolve column "DEFAULT" in the child plans so that they must be unresolved.
      case i: InsertIntoStatement => resolveColumnDefaultInCommandInputQuery(i)

      // Don't wait other rules to resolve the child plans of `SetVariable` as we need
      // to resolve column "DEFAULT" in the child plans so that they must be unresolved.
      case s: SetVariable => resolveColumnDefaultInCommandInputQuery(s)

      // Wait for other rules to resolve child plans first
      case p: LogicalPlan if !p.childrenResolved => p

      // Wait for the rule `DeduplicateRelations` to resolve conflicting attrs first.
      case p: LogicalPlan if hasConflictingAttrs(p) => p

      // If the projection list contains Stars, expand it.
      case p: Project if containsStar(p.projectList) =>
        val expanded = p.copy(projectList = buildExpandedProjectList(p.projectList, p.child))
        if (expanded.projectList.size < p.projectList.size) {
          checkTrailingCommaInSelect(expanded, starRemoved = true)
        }
        expanded
      // If the filter list contains Stars, expand it.
      case p: Filter if containsStar(Seq(p.condition)) =>
        p.copy(expandStarExpression(p.condition, p.child))
      // If the aggregate function argument contains Stars, expand it.
      case a: Aggregate if containsStar(a.aggregateExpressions) =>
        if (a.groupingExpressions.exists(_.isInstanceOf[UnresolvedOrdinal])) {
          throw QueryCompilationErrors.starNotAllowedWhenGroupByOrdinalPositionUsedError()
        } else {
          val expanded = a.copy(aggregateExpressions =
            buildExpandedProjectList(a.aggregateExpressions, a.child))
          if (expanded.aggregateExpressions.size < a.aggregateExpressions.size) {
            checkTrailingCommaInSelect(expanded, starRemoved = true)
          }
          expanded
        }
      case c: CollectMetrics if containsStar(c.metrics) =>
        c.copy(metrics = buildExpandedProjectList(c.metrics, c.child))
      case g: Generate if containsStar(g.generator.children) =>
        throw QueryCompilationErrors.invalidStarUsageError("explode/json_tuple/UDTF",
          extractStar(g.generator.children))
      // If the Unpivot ids or values contain Stars, expand them.
      case up: Unpivot if up.ids.exists(containsStar) ||
        // Only expand Stars in one-dimensional values
        up.values.exists(values => values.exists(_.length == 1) && values.exists(containsStar)) =>
        up.copy(
          ids = up.ids.map(buildExpandedProjectList(_, up.child)),
          // The inner exprs in Option[[exprs] is one-dimensional, e.g. Optional[[["*"]]].
          // The single NamedExpression turns into multiple, which we here have to turn into
          // Optional[[["col1"], ["col2"]]]
          values = up.values.map(_.flatMap(buildExpandedProjectList(_, up.child)).map(Seq(_)))
        )

      case u @ Union(children, _, _)
        // if there are duplicate output columns, give them unique expr ids
          if children.exists(c => c.output.map(_.exprId).distinct.length < c.output.length) =>
        val newChildren = children.map { c =>
          if (c.output.map(_.exprId).distinct.length < c.output.length) {
            val existingExprIds = mutable.HashSet[ExprId]()
            val projectList = c.output.map { attr =>
              if (existingExprIds.contains(attr.exprId)) {
                // replace non-first duplicates with aliases and tag them
                val newMetadata = new MetadataBuilder().withMetadata(attr.metadata)
                  .putNull("__is_duplicate").build()
                Alias(attr, attr.name)(explicitMetadata = Some(newMetadata))
              } else {
                // leave first duplicate alone
                existingExprIds.add(attr.exprId)
                attr
              }
            }
            Project(projectList, c)
          } else {
            c
          }
        }
        u.withNewChildren(newChildren)

      // A special case for Generate, because the output of Generate should not be resolved by
      // ResolveReferences. Attributes in the output will be resolved by ResolveGenerate.
      case g @ Generate(generator, _, _, _, _, _) if generator.resolved => g

      case g @ Generate(generator, join, outer, qualifier, output, child) =>
        val newG = resolveExpressionByPlanOutput(
          generator, child, throws = true, includeLastResort = true)
        if (newG.fastEquals(generator)) {
          g
        } else {
          Generate(newG.asInstanceOf[Generator], join, outer, qualifier, output, child)
        }

      case mg: MapGroups if mg.dataOrder.exists(!_.resolved) =>
        // Resolve against `AppendColumns`'s children, instead of `AppendColumns`,
        // because `AppendColumns`'s serializer might produce conflict attribute
        // names leading to ambiguous references exception.
        val planForResolve = mg.child match {
          case appendColumns: AppendColumns => appendColumns.child
          case plan => plan
        }
        val resolvedOrder = mg.dataOrder
          .map(resolveExpressionByPlanOutput(_, planForResolve).asInstanceOf[SortOrder])
        mg.copy(dataOrder = resolvedOrder)

      // Left and right sort expression have to be resolved against the respective child plan only
      case cg: CoGroup if cg.leftOrder.exists(!_.resolved) || cg.rightOrder.exists(!_.resolved) =>
        // Resolve against `AppendColumns`'s children, instead of `AppendColumns`,
        // because `AppendColumns`'s serializer might produce conflict attribute
        // names leading to ambiguous references exception.
        val (leftPlanForResolve, rightPlanForResolve) = Seq(cg.left, cg.right).map {
          case appendColumns: AppendColumns => appendColumns.child
          case plan => plan
        } match {
          case Seq(left, right) => (left, right)
        }

        val resolvedLeftOrder = cg.leftOrder
          .map(resolveExpressionByPlanOutput(_, leftPlanForResolve).asInstanceOf[SortOrder])
        val resolvedRightOrder = cg.rightOrder
          .map(resolveExpressionByPlanOutput(_, rightPlanForResolve).asInstanceOf[SortOrder])

        cg.copy(leftOrder = resolvedLeftOrder, rightOrder = resolvedRightOrder)

      // Skips plan which contains deserializer expressions, as they should be resolved by another
      // rule: ResolveDeserializer.
      case plan if containsDeserializer(plan.expressions) => plan

      case a: Aggregate => resolveReferencesInAggregate(a)

      // Special case for Project as it supports lateral column alias.
      case p: Project =>
        val resolvedBasic = p.projectList.map(resolveExpressionByPlanChildren(_, p))
        // Lateral column alias has higher priority than outer reference.
        val resolvedWithLCA = resolveLateralColumnAlias(resolvedBasic)
        val resolvedFinal = resolvedWithLCA.map(resolveColsLastResort)
        p.copy(projectList = resolvedFinal.map(_.asInstanceOf[NamedExpression]))

      case o: OverwriteByExpression if o.table.resolved =>
        // The delete condition of `OverwriteByExpression` will be passed to the table
        // implementation and should be resolved based on the table schema.
        o.copy(deleteExpr = resolveExpressionByPlanOutput(o.deleteExpr, o.table))

      case u: UpdateTable => resolveReferencesInUpdate(u)

      case m @ MergeIntoTable(targetTable, sourceTable, _, _, _, _, _)
        if !m.resolved && targetTable.resolved && sourceTable.resolved =>

        EliminateSubqueryAliases(targetTable) match {
          case r: NamedRelation if r.skipSchemaResolution =>
            // Do not resolve the expression if the target table accepts any schema.
            // This allows data sources to customize their own resolution logic using
            // custom resolution rules.
            m

          case _ =>
            val newMatchedActions = m.matchedActions.map {
              case DeleteAction(deleteCondition) =>
                val resolvedDeleteCondition = deleteCondition.map(
                  resolveExpressionByPlanChildren(_, m))
                DeleteAction(resolvedDeleteCondition)
              case UpdateAction(updateCondition, assignments) =>
                val resolvedUpdateCondition = updateCondition.map(
                  resolveExpressionByPlanChildren(_, m))
                UpdateAction(
                  resolvedUpdateCondition,
                  // The update value can access columns from both target and source tables.
                  resolveAssignments(assignments, m, MergeResolvePolicy.BOTH))
              case UpdateStarAction(updateCondition) =>
                val assignments = targetTable.output.map { attr =>
                  Assignment(attr, UnresolvedAttribute(Seq(attr.name)))
                }
                UpdateAction(
                  updateCondition.map(resolveExpressionByPlanChildren(_, m)),
                  // For UPDATE *, the value must be from source table.
                  resolveAssignments(assignments, m, MergeResolvePolicy.SOURCE))
              case o => o
            }
            val newNotMatchedActions = m.notMatchedActions.map {
              case InsertAction(insertCondition, assignments) =>
                // The insert action is used when not matched, so its condition and value can only
                // access columns from the source table.
                val resolvedInsertCondition = insertCondition.map(
                  resolveExpressionByPlanOutput(_, m.sourceTable))
                InsertAction(
                  resolvedInsertCondition,
                  resolveAssignments(assignments, m, MergeResolvePolicy.SOURCE))
              case InsertStarAction(insertCondition) =>
                // The insert action is used when not matched, so its condition and value can only
                // access columns from the source table.
                val resolvedInsertCondition = insertCondition.map(
                  resolveExpressionByPlanOutput(_, m.sourceTable))
                val assignments = targetTable.output.map { attr =>
                  Assignment(attr, UnresolvedAttribute(Seq(attr.name)))
                }
                InsertAction(
                  resolvedInsertCondition,
                  resolveAssignments(assignments, m, MergeResolvePolicy.SOURCE))
              case o => o
            }
            val newNotMatchedBySourceActions = m.notMatchedBySourceActions.map {
              case DeleteAction(deleteCondition) =>
                val resolvedDeleteCondition = deleteCondition.map(
                  resolveExpressionByPlanOutput(_, targetTable))
                DeleteAction(resolvedDeleteCondition)
              case UpdateAction(updateCondition, assignments) =>
                val resolvedUpdateCondition = updateCondition.map(
                  resolveExpressionByPlanOutput(_, targetTable))
                UpdateAction(
                  resolvedUpdateCondition,
                  // The update value can access columns from the target table only.
                  resolveAssignments(assignments, m, MergeResolvePolicy.TARGET))
              case o => o
            }

            val resolvedMergeCondition = resolveExpressionByPlanChildren(m.mergeCondition, m)
            m.copy(mergeCondition = resolvedMergeCondition,
              matchedActions = newMatchedActions,
              notMatchedActions = newNotMatchedActions,
              notMatchedBySourceActions = newNotMatchedBySourceActions)
        }

      // UnresolvedHaving can host grouping expressions and aggregate functions. We should resolve
      // columns with `agg.output` and the rule `ResolveAggregateFunctions` will push them down to
      // Aggregate later.
      case u @ UnresolvedHaving(cond, agg: Aggregate) if !cond.resolved =>
        u.mapExpressions { e =>
          // Columns in HAVING should be resolved with `agg.child.output` first, to follow the SQL
          // standard. See more details in SPARK-31519.
          val resolvedWithAgg = resolveColWithAgg(e, agg)
          resolveExpressionByPlanChildren(resolvedWithAgg, u, includeLastResort = true)
        }

      // RepartitionByExpression can host missing attributes that are from a descendant node.
      // For example, `spark.table("t").select($"a").repartition($"b")`. We can resolve `b` with
      // table `t` even if there is a Project node between the table scan node and Sort node.
      // We also need to propagate the missing attributes from the descendant node to the current
      // node, and project them way at the end via an extra Project.
      case r @ RepartitionByExpression(partitionExprs, child, _, _)
        if !r.resolved || r.missingInput.nonEmpty =>
        val resolvedBasic = partitionExprs.map(resolveExpressionByPlanChildren(_, r))
        val (newPartitionExprs, newChild) = resolveExprsAndAddMissingAttrs(resolvedBasic, child)
        // Missing columns should be resolved right after basic column resolution.
        // See the doc of `ResolveReferences`.
        val resolvedFinal = newPartitionExprs.map(resolveColsLastResort)
        if (child.output == newChild.output) {
          r.copy(resolvedFinal, newChild)
        } else {
          Project(child.output, r.copy(resolvedFinal, newChild))
        }

      // Filter can host both grouping expressions/aggregate functions and missing attributes.
      // The grouping expressions/aggregate functions resolution takes precedence over missing
      // attributes. See the classdoc of `ResolveReferences` for details.
      case f @ Filter(cond, child) if !cond.resolved || f.missingInput.nonEmpty =>
        val resolvedBasic = resolveExpressionByPlanChildren(cond, f)
        val resolvedWithAgg = resolveColWithAgg(resolvedBasic, child)
        val (newCond, newChild) = resolveExprsAndAddMissingAttrs(Seq(resolvedWithAgg), child)
        // Missing columns should be resolved right after basic column resolution.
        // See the doc of `ResolveReferences`.
        val resolvedFinal = resolveColsLastResort(newCond.head)
        if (child.output == newChild.output) {
          f.copy(condition = resolvedFinal)
        } else {
          // Add missing attributes and then project them away.
          val newFilter = Filter(resolvedFinal, newChild)
          Project(child.output, newFilter)
        }

      case s: Sort if !s.resolved || s.missingInput.nonEmpty =>
        resolveReferencesInSort(s)

      // Pass for Execute Immediate as arguments will be resolved by [[SubstituteExecuteImmediate]].
      case e : ExecuteImmediateQuery => e

      case q: LogicalPlan =>
        logTrace(s"Attempting to resolve ${q.simpleString(conf.maxToStringFields)}")
        q.mapExpressions(resolveExpressionByPlanChildren(_, q, includeLastResort = true))
    }