in sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveWithCTE.scala [47:280]
private def resolveWithCTE(
plan: LogicalPlan,
cteDefMap: mutable.HashMap[Long, CTERelationDef]): LogicalPlan = {
plan.resolveOperatorsDownWithPruning(_.containsAllPatterns(CTE)) {
case withCTE @ WithCTE(_, cteDefs) =>
val newCTEDefs = cteDefs.map {
// cteDef in the first case is either recursive and all the recursive CTERelationRefs
// are already substituted to UnionLoopRef in the previous pass, or it is not recursive
// at all. In both cases we need to put it in the map in case it is resolved.
// Second case is performing the substitution of recursive CTERelationRefs.
case cteDef if !cteDef.hasSelfReferenceAsCTERef =>
if (cteDef.resolved) {
cteDefMap.put(cteDef.id, cteDef)
}
cteDef
case cteDef =>
// Multiple self-references are not allowed within one cteDef.
checkNumberOfSelfReferences(cteDef)
cteDef.child match {
// If it is a supported recursive CTE query pattern (4 so far), extract the anchor and
// recursive plans from the Union and rewrite Union with UnionLoop. The recursive CTE
// references inside UnionLoop's recursive plan will be rewritten as UnionLoopRef,
// using the output of the resolved anchor plan. The side effect of recursive
// CTERelationRef->UnionLoopRef substitution is that `cteDef` that was originally
// considered `recursive` is no more in the context of `cteDef.recursive` method
// definition.
//
// Simple case of duplicating (UNION ALL) clause.
case alias @ SubqueryAlias(_, Union(Seq(anchor, recursion), false, false)) =>
if (!anchor.resolved) {
cteDef
} else {
val loop = UnionLoop(
cteDef.id,
anchor,
rewriteRecursiveCTERefs(recursion, anchor, cteDef.id, None))
cteDef.copy(child = alias.copy(child = loop))
}
// Simple case of duplicating (UNION ALL) clause.
case alias @ SubqueryAlias(_, withCTE @ WithCTE(
Union(Seq(anchor, recursion), false, false), innerCteDefs)) =>
if (!anchor.resolved) {
cteDef
} else {
// We need to check whether any of the inner CTEs has a self reference and replace
// it if needed
val newInnerCteDefs = innerCteDefs.map { innerCteDef =>
innerCteDef.copy(child = rewriteRecursiveCTERefs(
innerCteDef.child, anchor, cteDef.id, None))
}
val loop = UnionLoop(
cteDef.id,
anchor,
rewriteRecursiveCTERefs(recursion, anchor, cteDef.id, None))
cteDef.copy(child = alias.copy(child = withCTE.copy(
plan = loop, cteDefs = newInnerCteDefs)))
}
// The case of CTE name followed by a parenthesized list of column name(s), eg.
// WITH RECURSIVE t(n).
case alias @ SubqueryAlias(_,
columnAlias @ UnresolvedSubqueryColumnAliases(
colNames,
Union(Seq(anchor, recursion), false, false)
)) =>
if (!anchor.resolved) {
cteDef
} else {
val loop = UnionLoop(
cteDef.id,
anchor,
rewriteRecursiveCTERefs(recursion, anchor, cteDef.id, Some(colNames)))
cteDef.copy(child = alias.copy(child = columnAlias.copy(child = loop)))
}
// The case of CTE name followed by a parenthesized list of column name(s), eg.
// WITH RECURSIVE t(n).
case alias @ SubqueryAlias(_,
columnAlias @ UnresolvedSubqueryColumnAliases(
colNames,
withCTE @ WithCTE(Union(Seq(anchor, recursion), false, false), innerCteDefs)
)) =>
if (!anchor.resolved) {
cteDef
} else {
// We need to check whether any of the inner CTEs has a self reference and replace
// it if needed
val newInnerCteDefs = innerCteDefs.map { innerCteDef =>
innerCteDef.copy(child = rewriteRecursiveCTERefs(
innerCteDef.child, anchor, cteDef.id, Some(colNames)))
}
val loop = UnionLoop(
cteDef.id,
anchor,
rewriteRecursiveCTERefs(recursion, anchor, cteDef.id, Some(colNames)))
cteDef.copy(child = alias.copy(child = columnAlias.copy(
child = withCTE.copy(plan = loop, cteDefs = newInnerCteDefs))))
}
// If the recursion is described with a UNION (deduplicating) clause then the
// recursive term should not return those rows that have been calculated previously,
// and we exclude those rows from the current iteration result.
case alias @ SubqueryAlias(_,
Distinct(Union(Seq(anchor, recursion), false, false))) =>
cteDef.failAnalysis(
errorClass = "UNION_NOT_SUPPORTED_IN_RECURSIVE_CTE",
messageParameters = Map.empty)
if (!anchor.resolved) {
cteDef
} else {
val loop = UnionLoop(
cteDef.id,
Distinct(anchor),
Except(
rewriteRecursiveCTERefs(recursion, anchor, cteDef.id, None),
UnionLoopRef(cteDef.id, anchor.output, true),
isAll = false
)
)
cteDef.copy(child = alias.copy(child = loop))
}
// UNION case with CTEs inside.
case alias @ SubqueryAlias(_, withCTE @ WithCTE(
Distinct(Union(Seq(anchor, recursion), false, false)), innerCteDefs)) =>
cteDef.failAnalysis(
errorClass = "UNION_NOT_SUPPORTED_IN_RECURSIVE_CTE",
messageParameters = Map.empty)
if (!anchor.resolved) {
cteDef
} else {
// We need to check whether any of the inner CTEs has a self reference and replace
// it if needed
val newInnerCteDefs = innerCteDefs.map { innerCteDef =>
innerCteDef.copy(child = rewriteRecursiveCTERefs(
innerCteDef.child, anchor, cteDef.id, None))
}
val loop = UnionLoop(
cteDef.id,
Distinct(anchor),
Except(
rewriteRecursiveCTERefs(recursion, anchor, cteDef.id, None),
UnionLoopRef(cteDef.id, anchor.output, true),
isAll = false
)
)
cteDef.copy(child = alias.copy(child = withCTE.copy(
plan = loop, cteDefs = newInnerCteDefs)))
}
// The case of CTE name followed by a parenthesized list of column name(s).
case alias @ SubqueryAlias(_,
columnAlias@UnresolvedSubqueryColumnAliases(
colNames,
Distinct(Union(Seq(anchor, recursion), false, false))
)) =>
cteDef.failAnalysis(
errorClass = "UNION_NOT_SUPPORTED_IN_RECURSIVE_CTE",
messageParameters = Map.empty)
if (!anchor.resolved) {
cteDef
} else {
val loop = UnionLoop(
cteDef.id,
Distinct(anchor),
Except(
rewriteRecursiveCTERefs(recursion, anchor, cteDef.id, Some(colNames)),
UnionLoopRef(cteDef.id, anchor.output, true),
isAll = false
)
)
cteDef.copy(child = alias.copy(child = columnAlias.copy(child = loop)))
}
// The case of CTE name followed by a parenthesized list of column name(s).
case alias @ SubqueryAlias(_,
columnAlias@UnresolvedSubqueryColumnAliases(
colNames,
WithCTE(Distinct(Union(Seq(anchor, recursion), false, false)), innerCteDefs)
)) =>
cteDef.failAnalysis(
errorClass = "UNION_NOT_SUPPORTED_IN_RECURSIVE_CTE",
messageParameters = Map.empty)
if (!anchor.resolved) {
cteDef
} else {
// We need to check whether any of the inner CTEs has a self reference and replace
// it if needed
val newInnerCteDefs = innerCteDefs.map { innerCteDef =>
innerCteDef.copy(child = rewriteRecursiveCTERefs(
innerCteDef.child, anchor, cteDef.id, Some(colNames)))
}
val loop = UnionLoop(
cteDef.id,
Distinct(anchor),
Except(
rewriteRecursiveCTERefs(recursion, anchor, cteDef.id, Some(colNames)),
UnionLoopRef(cteDef.id, anchor.output, true),
isAll = false
)
)
cteDef.copy(child = alias.copy(child = columnAlias.copy(
child = withCTE.copy(plan = loop, cteDefs = newInnerCteDefs))))
}
case other =>
// We do not support cases of sole Union (needs a SubqueryAlias above it), nor
// Project (as UnresolvedSubqueryColumnAliases have not been substituted with the
// Project yet), leaving us with cases of SubqueryAlias->Union and SubqueryAlias->
// UnresolvedSubqueryColumnAliases->Union. The same applies to Distinct Union.
throw new AnalysisException(
errorClass = "INVALID_RECURSIVE_CTE",
messageParameters = Map.empty)
}
}
withCTE.copy(cteDefs = newCTEDefs)
// This is a non-recursive reference to a definition.
case ref: CTERelationRef if !ref.resolved =>
cteDefMap.get(ref.cteId).map { cteDef =>
// cteDef is certainly resolved, otherwise it would not have been in the map.
CTERelationRef(
cteDef.id, cteDef.resolved, cteDef.output, cteDef.isStreaming, maxRows = cteDef.maxRows)
}.getOrElse {
ref
}
case other =>
other.transformExpressionsWithPruning(_.containsAllPatterns(PLAN_EXPRESSION, CTE)) {
case e: SubqueryExpression => e.withNewPlan(resolveWithCTE(e.plan, cteDefMap))
}
}
}