in sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala [1463:1740]
def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperatorsUp {
// Don't wait other rules to resolve the child plans of `InsertIntoStatement` as we need
// to resolve column "DEFAULT" in the child plans so that they must be unresolved.
case i: InsertIntoStatement => resolveColumnDefaultInCommandInputQuery(i)
// Don't wait other rules to resolve the child plans of `SetVariable` as we need
// to resolve column "DEFAULT" in the child plans so that they must be unresolved.
case s: SetVariable => resolveColumnDefaultInCommandInputQuery(s)
// Wait for other rules to resolve child plans first
case p: LogicalPlan if !p.childrenResolved => p
// Wait for the rule `DeduplicateRelations` to resolve conflicting attrs first.
case p: LogicalPlan if hasConflictingAttrs(p) => p
// If the projection list contains Stars, expand it.
case p: Project if containsStar(p.projectList) =>
val expanded = p.copy(projectList = buildExpandedProjectList(p.projectList, p.child))
if (expanded.projectList.size < p.projectList.size) {
checkTrailingCommaInSelect(expanded, starRemoved = true)
}
expanded
// If the filter list contains Stars, expand it.
case p: Filter if containsStar(Seq(p.condition)) =>
p.copy(expandStarExpression(p.condition, p.child))
// If the aggregate function argument contains Stars, expand it.
case a: Aggregate if containsStar(a.aggregateExpressions) =>
if (a.groupingExpressions.exists(_.isInstanceOf[UnresolvedOrdinal])) {
throw QueryCompilationErrors.starNotAllowedWhenGroupByOrdinalPositionUsedError()
} else {
val expanded = a.copy(aggregateExpressions =
buildExpandedProjectList(a.aggregateExpressions, a.child))
if (expanded.aggregateExpressions.size < a.aggregateExpressions.size) {
checkTrailingCommaInSelect(expanded, starRemoved = true)
}
expanded
}
case c: CollectMetrics if containsStar(c.metrics) =>
c.copy(metrics = buildExpandedProjectList(c.metrics, c.child))
case g: Generate if containsStar(g.generator.children) =>
throw QueryCompilationErrors.invalidStarUsageError("explode/json_tuple/UDTF",
extractStar(g.generator.children))
// If the Unpivot ids or values contain Stars, expand them.
case up: Unpivot if up.ids.exists(containsStar) ||
// Only expand Stars in one-dimensional values
up.values.exists(values => values.exists(_.length == 1) && values.exists(containsStar)) =>
up.copy(
ids = up.ids.map(buildExpandedProjectList(_, up.child)),
// The inner exprs in Option[[exprs] is one-dimensional, e.g. Optional[[["*"]]].
// The single NamedExpression turns into multiple, which we here have to turn into
// Optional[[["col1"], ["col2"]]]
values = up.values.map(_.flatMap(buildExpandedProjectList(_, up.child)).map(Seq(_)))
)
case u @ Union(children, _, _)
// if there are duplicate output columns, give them unique expr ids
if children.exists(c => c.output.map(_.exprId).distinct.length < c.output.length) =>
val newChildren = children.map { c =>
if (c.output.map(_.exprId).distinct.length < c.output.length) {
val existingExprIds = mutable.HashSet[ExprId]()
val projectList = c.output.map { attr =>
if (existingExprIds.contains(attr.exprId)) {
// replace non-first duplicates with aliases and tag them
val newMetadata = new MetadataBuilder().withMetadata(attr.metadata)
.putNull("__is_duplicate").build()
Alias(attr, attr.name)(explicitMetadata = Some(newMetadata))
} else {
// leave first duplicate alone
existingExprIds.add(attr.exprId)
attr
}
}
Project(projectList, c)
} else {
c
}
}
u.withNewChildren(newChildren)
// A special case for Generate, because the output of Generate should not be resolved by
// ResolveReferences. Attributes in the output will be resolved by ResolveGenerate.
case g @ Generate(generator, _, _, _, _, _) if generator.resolved => g
case g @ Generate(generator, join, outer, qualifier, output, child) =>
val newG = resolveExpressionByPlanOutput(
generator, child, throws = true, includeLastResort = true)
if (newG.fastEquals(generator)) {
g
} else {
Generate(newG.asInstanceOf[Generator], join, outer, qualifier, output, child)
}
case mg: MapGroups if mg.dataOrder.exists(!_.resolved) =>
// Resolve against `AppendColumns`'s children, instead of `AppendColumns`,
// because `AppendColumns`'s serializer might produce conflict attribute
// names leading to ambiguous references exception.
val planForResolve = mg.child match {
case appendColumns: AppendColumns => appendColumns.child
case plan => plan
}
val resolvedOrder = mg.dataOrder
.map(resolveExpressionByPlanOutput(_, planForResolve).asInstanceOf[SortOrder])
mg.copy(dataOrder = resolvedOrder)
// Left and right sort expression have to be resolved against the respective child plan only
case cg: CoGroup if cg.leftOrder.exists(!_.resolved) || cg.rightOrder.exists(!_.resolved) =>
// Resolve against `AppendColumns`'s children, instead of `AppendColumns`,
// because `AppendColumns`'s serializer might produce conflict attribute
// names leading to ambiguous references exception.
val (leftPlanForResolve, rightPlanForResolve) = Seq(cg.left, cg.right).map {
case appendColumns: AppendColumns => appendColumns.child
case plan => plan
} match {
case Seq(left, right) => (left, right)
}
val resolvedLeftOrder = cg.leftOrder
.map(resolveExpressionByPlanOutput(_, leftPlanForResolve).asInstanceOf[SortOrder])
val resolvedRightOrder = cg.rightOrder
.map(resolveExpressionByPlanOutput(_, rightPlanForResolve).asInstanceOf[SortOrder])
cg.copy(leftOrder = resolvedLeftOrder, rightOrder = resolvedRightOrder)
// Skips plan which contains deserializer expressions, as they should be resolved by another
// rule: ResolveDeserializer.
case plan if containsDeserializer(plan.expressions) => plan
case a: Aggregate => resolveReferencesInAggregate(a)
// Special case for Project as it supports lateral column alias.
case p: Project =>
val resolvedBasic = p.projectList.map(resolveExpressionByPlanChildren(_, p))
// Lateral column alias has higher priority than outer reference.
val resolvedWithLCA = resolveLateralColumnAlias(resolvedBasic)
val resolvedFinal = resolvedWithLCA.map(resolveColsLastResort)
p.copy(projectList = resolvedFinal.map(_.asInstanceOf[NamedExpression]))
case o: OverwriteByExpression if o.table.resolved =>
// The delete condition of `OverwriteByExpression` will be passed to the table
// implementation and should be resolved based on the table schema.
o.copy(deleteExpr = resolveExpressionByPlanOutput(o.deleteExpr, o.table))
case u: UpdateTable => resolveReferencesInUpdate(u)
case m @ MergeIntoTable(targetTable, sourceTable, _, _, _, _, _)
if !m.resolved && targetTable.resolved && sourceTable.resolved =>
EliminateSubqueryAliases(targetTable) match {
case r: NamedRelation if r.skipSchemaResolution =>
// Do not resolve the expression if the target table accepts any schema.
// This allows data sources to customize their own resolution logic using
// custom resolution rules.
m
case _ =>
val newMatchedActions = m.matchedActions.map {
case DeleteAction(deleteCondition) =>
val resolvedDeleteCondition = deleteCondition.map(
resolveExpressionByPlanChildren(_, m))
DeleteAction(resolvedDeleteCondition)
case UpdateAction(updateCondition, assignments) =>
val resolvedUpdateCondition = updateCondition.map(
resolveExpressionByPlanChildren(_, m))
UpdateAction(
resolvedUpdateCondition,
// The update value can access columns from both target and source tables.
resolveAssignments(assignments, m, MergeResolvePolicy.BOTH))
case UpdateStarAction(updateCondition) =>
val assignments = targetTable.output.map { attr =>
Assignment(attr, UnresolvedAttribute(Seq(attr.name)))
}
UpdateAction(
updateCondition.map(resolveExpressionByPlanChildren(_, m)),
// For UPDATE *, the value must be from source table.
resolveAssignments(assignments, m, MergeResolvePolicy.SOURCE))
case o => o
}
val newNotMatchedActions = m.notMatchedActions.map {
case InsertAction(insertCondition, assignments) =>
// The insert action is used when not matched, so its condition and value can only
// access columns from the source table.
val resolvedInsertCondition = insertCondition.map(
resolveExpressionByPlanOutput(_, m.sourceTable))
InsertAction(
resolvedInsertCondition,
resolveAssignments(assignments, m, MergeResolvePolicy.SOURCE))
case InsertStarAction(insertCondition) =>
// The insert action is used when not matched, so its condition and value can only
// access columns from the source table.
val resolvedInsertCondition = insertCondition.map(
resolveExpressionByPlanOutput(_, m.sourceTable))
val assignments = targetTable.output.map { attr =>
Assignment(attr, UnresolvedAttribute(Seq(attr.name)))
}
InsertAction(
resolvedInsertCondition,
resolveAssignments(assignments, m, MergeResolvePolicy.SOURCE))
case o => o
}
val newNotMatchedBySourceActions = m.notMatchedBySourceActions.map {
case DeleteAction(deleteCondition) =>
val resolvedDeleteCondition = deleteCondition.map(
resolveExpressionByPlanOutput(_, targetTable))
DeleteAction(resolvedDeleteCondition)
case UpdateAction(updateCondition, assignments) =>
val resolvedUpdateCondition = updateCondition.map(
resolveExpressionByPlanOutput(_, targetTable))
UpdateAction(
resolvedUpdateCondition,
// The update value can access columns from the target table only.
resolveAssignments(assignments, m, MergeResolvePolicy.TARGET))
case o => o
}
val resolvedMergeCondition = resolveExpressionByPlanChildren(m.mergeCondition, m)
m.copy(mergeCondition = resolvedMergeCondition,
matchedActions = newMatchedActions,
notMatchedActions = newNotMatchedActions,
notMatchedBySourceActions = newNotMatchedBySourceActions)
}
// UnresolvedHaving can host grouping expressions and aggregate functions. We should resolve
// columns with `agg.output` and the rule `ResolveAggregateFunctions` will push them down to
// Aggregate later.
case u @ UnresolvedHaving(cond, agg: Aggregate) if !cond.resolved =>
u.mapExpressions { e =>
// Columns in HAVING should be resolved with `agg.child.output` first, to follow the SQL
// standard. See more details in SPARK-31519.
val resolvedWithAgg = resolveColWithAgg(e, agg)
resolveExpressionByPlanChildren(resolvedWithAgg, u, includeLastResort = true)
}
// RepartitionByExpression can host missing attributes that are from a descendant node.
// For example, `spark.table("t").select($"a").repartition($"b")`. We can resolve `b` with
// table `t` even if there is a Project node between the table scan node and Sort node.
// We also need to propagate the missing attributes from the descendant node to the current
// node, and project them way at the end via an extra Project.
case r @ RepartitionByExpression(partitionExprs, child, _, _)
if !r.resolved || r.missingInput.nonEmpty =>
val resolvedBasic = partitionExprs.map(resolveExpressionByPlanChildren(_, r))
val (newPartitionExprs, newChild) = resolveExprsAndAddMissingAttrs(resolvedBasic, child)
// Missing columns should be resolved right after basic column resolution.
// See the doc of `ResolveReferences`.
val resolvedFinal = newPartitionExprs.map(resolveColsLastResort)
if (child.output == newChild.output) {
r.copy(resolvedFinal, newChild)
} else {
Project(child.output, r.copy(resolvedFinal, newChild))
}
// Filter can host both grouping expressions/aggregate functions and missing attributes.
// The grouping expressions/aggregate functions resolution takes precedence over missing
// attributes. See the classdoc of `ResolveReferences` for details.
case f @ Filter(cond, child) if !cond.resolved || f.missingInput.nonEmpty =>
val resolvedBasic = resolveExpressionByPlanChildren(cond, f)
val resolvedWithAgg = resolveColWithAgg(resolvedBasic, child)
val (newCond, newChild) = resolveExprsAndAddMissingAttrs(Seq(resolvedWithAgg), child)
// Missing columns should be resolved right after basic column resolution.
// See the doc of `ResolveReferences`.
val resolvedFinal = resolveColsLastResort(newCond.head)
if (child.output == newChild.output) {
f.copy(condition = resolvedFinal)
} else {
// Add missing attributes and then project them away.
val newFilter = Filter(resolvedFinal, newChild)
Project(child.output, newFilter)
}
case s: Sort if !s.resolved || s.missingInput.nonEmpty =>
resolveReferencesInSort(s)
// Pass for Execute Immediate as arguments will be resolved by [[SubstituteExecuteImmediate]].
case e : ExecuteImmediateQuery => e
case q: LogicalPlan =>
logTrace(s"Attempting to resolve ${q.simpleString(conf.maxToStringFields)}")
q.mapExpressions(resolveExpressionByPlanChildren(_, q, includeLastResort = true))
}