private def resolveWithCTE()

in sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveWithCTE.scala [47:280]


  private def resolveWithCTE(
      plan: LogicalPlan,
      cteDefMap: mutable.HashMap[Long, CTERelationDef]): LogicalPlan = {
    plan.resolveOperatorsDownWithPruning(_.containsAllPatterns(CTE)) {
      case withCTE @ WithCTE(_, cteDefs) =>
        val newCTEDefs = cteDefs.map {
          // cteDef in the first case is either recursive and all the recursive CTERelationRefs
          // are already substituted to UnionLoopRef in the previous pass, or it is not recursive
          // at all. In both cases we need to put it in the map in case it is resolved.
          // Second case is performing the substitution of recursive CTERelationRefs.
          case cteDef if !cteDef.hasSelfReferenceAsCTERef =>
            if (cteDef.resolved) {
              cteDefMap.put(cteDef.id, cteDef)
            }
            cteDef
          case cteDef =>
            // Multiple self-references are not allowed within one cteDef.
            checkNumberOfSelfReferences(cteDef)
            cteDef.child match {
              // If it is a supported recursive CTE query pattern (4 so far), extract the anchor and
              // recursive plans from the Union and rewrite Union with UnionLoop. The recursive CTE
              // references inside UnionLoop's recursive plan will be rewritten as UnionLoopRef,
              // using the output of the resolved anchor plan. The side effect of recursive
              // CTERelationRef->UnionLoopRef substitution is that `cteDef` that was originally
              // considered `recursive` is no more in the context of `cteDef.recursive` method
              // definition.
              //
              // Simple case of duplicating (UNION ALL) clause.
              case alias @ SubqueryAlias(_, Union(Seq(anchor, recursion), false, false)) =>
                if (!anchor.resolved) {
                  cteDef
                } else {
                  val loop = UnionLoop(
                    cteDef.id,
                    anchor,
                    rewriteRecursiveCTERefs(recursion, anchor, cteDef.id, None))
                  cteDef.copy(child = alias.copy(child = loop))
                }

              // Simple case of duplicating (UNION ALL) clause.
              case alias @ SubqueryAlias(_, withCTE @ WithCTE(
                Union(Seq(anchor, recursion), false, false), innerCteDefs)) =>
                if (!anchor.resolved) {
                  cteDef
                } else {
                  // We need to check whether any of the inner CTEs has a self reference and replace
                  // it if needed
                  val newInnerCteDefs = innerCteDefs.map { innerCteDef =>
                    innerCteDef.copy(child = rewriteRecursiveCTERefs(
                      innerCteDef.child, anchor, cteDef.id, None))
                  }
                  val loop = UnionLoop(
                    cteDef.id,
                    anchor,
                    rewriteRecursiveCTERefs(recursion, anchor, cteDef.id, None))
                  cteDef.copy(child = alias.copy(child = withCTE.copy(
                    plan = loop, cteDefs = newInnerCteDefs)))
                }

              // The case of CTE name followed by a parenthesized list of column name(s), eg.
              // WITH RECURSIVE t(n).
              case alias @ SubqueryAlias(_,
                  columnAlias @ UnresolvedSubqueryColumnAliases(
                  colNames,
                  Union(Seq(anchor, recursion), false, false)
                )) =>
                if (!anchor.resolved) {
                  cteDef
                } else {
                  val loop = UnionLoop(
                    cteDef.id,
                    anchor,
                    rewriteRecursiveCTERefs(recursion, anchor, cteDef.id, Some(colNames)))
                  cteDef.copy(child = alias.copy(child = columnAlias.copy(child = loop)))
                }

              // The case of CTE name followed by a parenthesized list of column name(s), eg.
              // WITH RECURSIVE t(n).
              case alias @ SubqueryAlias(_,
                   columnAlias @ UnresolvedSubqueryColumnAliases(
                   colNames,
                   withCTE @ WithCTE(Union(Seq(anchor, recursion), false, false), innerCteDefs)
                )) =>
                if (!anchor.resolved) {
                  cteDef
                } else {
                  // We need to check whether any of the inner CTEs has a self reference and replace
                  // it if needed
                  val newInnerCteDefs = innerCteDefs.map { innerCteDef =>
                    innerCteDef.copy(child = rewriteRecursiveCTERefs(
                      innerCteDef.child, anchor, cteDef.id, Some(colNames)))
                  }
                  val loop = UnionLoop(
                    cteDef.id,
                    anchor,
                    rewriteRecursiveCTERefs(recursion, anchor, cteDef.id, Some(colNames)))
                  cteDef.copy(child = alias.copy(child = columnAlias.copy(
                    child = withCTE.copy(plan = loop, cteDefs = newInnerCteDefs))))
                }

              // If the recursion is described with a UNION (deduplicating) clause then the
              // recursive term should not return those rows that have been calculated previously,
              // and we exclude those rows from the current iteration result.
              case alias @ SubqueryAlias(_,
                  Distinct(Union(Seq(anchor, recursion), false, false))) =>
                cteDef.failAnalysis(
                  errorClass = "UNION_NOT_SUPPORTED_IN_RECURSIVE_CTE",
                  messageParameters = Map.empty)
                if (!anchor.resolved) {
                  cteDef
                } else {
                  val loop = UnionLoop(
                    cteDef.id,
                    Distinct(anchor),
                    Except(
                      rewriteRecursiveCTERefs(recursion, anchor, cteDef.id, None),
                      UnionLoopRef(cteDef.id, anchor.output, true),
                      isAll = false
                    )
                  )
                  cteDef.copy(child = alias.copy(child = loop))
                }

              // UNION case with CTEs inside.
              case alias @ SubqueryAlias(_, withCTE @ WithCTE(
                   Distinct(Union(Seq(anchor, recursion), false, false)), innerCteDefs)) =>
                cteDef.failAnalysis(
                  errorClass = "UNION_NOT_SUPPORTED_IN_RECURSIVE_CTE",
                  messageParameters = Map.empty)
                if (!anchor.resolved) {
                  cteDef
                } else {
                  // We need to check whether any of the inner CTEs has a self reference and replace
                  // it if needed
                  val newInnerCteDefs = innerCteDefs.map { innerCteDef =>
                    innerCteDef.copy(child = rewriteRecursiveCTERefs(
                      innerCteDef.child, anchor, cteDef.id, None))
                  }
                  val loop = UnionLoop(
                    cteDef.id,
                    Distinct(anchor),
                    Except(
                      rewriteRecursiveCTERefs(recursion, anchor, cteDef.id, None),
                      UnionLoopRef(cteDef.id, anchor.output, true),
                      isAll = false
                    )
                  )
                  cteDef.copy(child = alias.copy(child = withCTE.copy(
                    plan = loop, cteDefs = newInnerCteDefs)))
                }

              // The case of CTE name followed by a parenthesized list of column name(s).
              case alias @ SubqueryAlias(_,
                  columnAlias@UnresolvedSubqueryColumnAliases(
                  colNames,
                  Distinct(Union(Seq(anchor, recursion), false, false))
                )) =>
                cteDef.failAnalysis(
                  errorClass = "UNION_NOT_SUPPORTED_IN_RECURSIVE_CTE",
                  messageParameters = Map.empty)
                if (!anchor.resolved) {
                  cteDef
                } else {
                  val loop = UnionLoop(
                    cteDef.id,
                    Distinct(anchor),
                    Except(
                      rewriteRecursiveCTERefs(recursion, anchor, cteDef.id, Some(colNames)),
                      UnionLoopRef(cteDef.id, anchor.output, true),
                      isAll = false
                    )
                  )
                  cteDef.copy(child = alias.copy(child = columnAlias.copy(child = loop)))
                }

              // The case of CTE name followed by a parenthesized list of column name(s).
              case alias @ SubqueryAlias(_,
                   columnAlias@UnresolvedSubqueryColumnAliases(
                   colNames,
                   WithCTE(Distinct(Union(Seq(anchor, recursion), false, false)), innerCteDefs)
                )) =>
                cteDef.failAnalysis(
                  errorClass = "UNION_NOT_SUPPORTED_IN_RECURSIVE_CTE",
                  messageParameters = Map.empty)
                if (!anchor.resolved) {
                  cteDef
                } else {
                  // We need to check whether any of the inner CTEs has a self reference and replace
                  // it if needed
                  val newInnerCteDefs = innerCteDefs.map { innerCteDef =>
                    innerCteDef.copy(child = rewriteRecursiveCTERefs(
                      innerCteDef.child, anchor, cteDef.id, Some(colNames)))
                  }
                  val loop = UnionLoop(
                    cteDef.id,
                    Distinct(anchor),
                    Except(
                      rewriteRecursiveCTERefs(recursion, anchor, cteDef.id, Some(colNames)),
                      UnionLoopRef(cteDef.id, anchor.output, true),
                      isAll = false
                    )
                  )
                  cteDef.copy(child = alias.copy(child = columnAlias.copy(
                    child = withCTE.copy(plan = loop, cteDefs = newInnerCteDefs))))
                }

              case other =>
                // We do not support cases of sole Union (needs a SubqueryAlias above it), nor
                // Project (as UnresolvedSubqueryColumnAliases have not been substituted with the
                // Project yet), leaving us with cases of SubqueryAlias->Union and SubqueryAlias->
                // UnresolvedSubqueryColumnAliases->Union. The same applies to Distinct Union.
                throw new AnalysisException(
                  errorClass = "INVALID_RECURSIVE_CTE",
                  messageParameters = Map.empty)
            }
        }
        withCTE.copy(cteDefs = newCTEDefs)

      // This is a non-recursive reference to a definition.
      case ref: CTERelationRef if !ref.resolved =>
        cteDefMap.get(ref.cteId).map { cteDef =>
          // cteDef is certainly resolved, otherwise it would not have been in the map.
          CTERelationRef(
            cteDef.id, cteDef.resolved, cteDef.output, cteDef.isStreaming, maxRows = cteDef.maxRows)
        }.getOrElse {
          ref
        }

      case other =>
        other.transformExpressionsWithPruning(_.containsAllPatterns(PLAN_EXPRESSION, CTE)) {
          case e: SubqueryExpression => e.withNewPlan(resolveWithCTE(e.plan, cteDefMap))
        }
    }
  }