protected def canExecuteWithoutJoin()

in sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/PropagateEmptyRelation.scala [68:179]


  protected def canExecuteWithoutJoin(plan: LogicalPlan): Boolean = true

  protected def commonApplyFunc: PartialFunction[LogicalPlan, LogicalPlan] = {
    case p: Union if p.children.exists(isEmpty) =>
      val newChildren = p.children.filterNot(isEmpty)
      if (newChildren.isEmpty) {
        empty(p)
      } else {
        val newPlan = if (newChildren.size > 1) Union(newChildren) else newChildren.head
        val outputs = newPlan.output.zip(p.output)
        // the original Union may produce different output attributes than the new one so we alias
        // them if needed
        if (outputs.forall { case (newAttr, oldAttr) => newAttr.exprId == oldAttr.exprId }) {
          newPlan
        } else {
          val newOutput = outputs.map { case (newAttr, oldAttr) =>
            if (newAttr.exprId == oldAttr.exprId) {
              newAttr
            } else {
              val newExplicitMetadata =
                if (oldAttr.metadata != newAttr.metadata) Some(oldAttr.metadata) else None
              Alias(newAttr, oldAttr.name)(oldAttr.exprId, explicitMetadata = newExplicitMetadata)
            }
          }
          Project(newOutput, newPlan)
        }
      }

    // Joins on empty LocalRelations generated from streaming sources are not eliminated
    // as stateful streaming joins need to perform other state management operations other than
    // just processing the input data.
    case p @ Join(_, _, joinType, conditionOpt, _)
        if !p.children.exists(_.isStreaming) =>
      val isLeftEmpty = isEmpty(p.left)
      val isRightEmpty = isEmpty(p.right)
      val isFalseCondition = conditionOpt match {
        case Some(FalseLiteral) => true
        case _ => false
      }
      if (isLeftEmpty || isRightEmpty || isFalseCondition) {
        joinType match {
          case _: InnerLike => empty(p)
          // Intersect is handled as LeftSemi by `ReplaceIntersectWithSemiJoin` rule.
          // Except is handled as LeftAnti by `ReplaceExceptWithAntiJoin` rule.
          case LeftOuter | LeftSemi | LeftAnti if isLeftEmpty => empty(p)
          case LeftSemi if isRightEmpty | isFalseCondition => empty(p)
          case LeftAnti if (isRightEmpty | isFalseCondition) && canExecuteWithoutJoin(p.left) =>
            p.left
          case FullOuter if isLeftEmpty && isRightEmpty => empty(p)
          case LeftOuter | FullOuter if isRightEmpty && canExecuteWithoutJoin(p.left) =>
            Project(p.left.output ++ nullValueProjectList(p.right), p.left)
          case RightOuter if isRightEmpty => empty(p)
          case RightOuter | FullOuter if isLeftEmpty && canExecuteWithoutJoin(p.right) =>
            Project(nullValueProjectList(p.left) ++ p.right.output, p.right)
          case LeftOuter if isFalseCondition && canExecuteWithoutJoin(p.left) =>
            Project(p.left.output ++ nullValueProjectList(p.right), p.left)
          case RightOuter if isFalseCondition && canExecuteWithoutJoin(p.right) =>
            Project(nullValueProjectList(p.left) ++ p.right.output, p.right)
          case _ => p
        }
      } else if (joinType == LeftSemi && conditionOpt.isEmpty &&
        nonEmpty(p.right) && canExecuteWithoutJoin(p.left)) {
        p.left
      } else if (joinType == LeftAnti && conditionOpt.isEmpty && nonEmpty(p.right)) {
        empty(p)
      } else {
        p
      }

    // Only replace a query stage if it would lead to a reduction of operators. !p.isDirectStage
    // means the physical node it contains is partial aggregate instead of QueryStageExec, which
    // is exactly what we want to propagate empty relation.
    case p: LogicalQueryStage if isEmpty(p) && !p.isDirectStage => empty(p)

    case p: UnaryNode if p.children.nonEmpty && p.children.forall(isEmpty) => p match {
      case _: Project => empty(p)
      case _: Filter => empty(p)
      case _: Sample => empty(p)
      case _: Sort => empty(p)
      case _: GlobalLimit if !p.isStreaming => empty(p)
      case _: LocalLimit if !p.isStreaming => empty(p)
      case _: Offset => empty(p)
      case _: RepartitionOperation =>
        if (p.getTagValue(ROOT_REPARTITION).isEmpty) {
          empty(p)
        } else {
          p.unsetTagValue(ROOT_REPARTITION)
          p
        }
      case _: RebalancePartitions => empty(p)
      // An aggregate with non-empty group expression will return one output row per group when the
      // input to the aggregate is not empty. If the input to the aggregate is empty then all groups
      // will be empty and thus the output will be empty. If we're working on batch data, we can
      // then treat the aggregate as redundant.
      //
      // If the aggregate is over streaming data, we may need to update the state store even if no
      // new rows are processed, so we can't eliminate the node.
      //
      // If the grouping expressions are empty, however, then the aggregate will always produce a
      // single output row and thus we cannot propagate the EmptyRelation.
      //
      // Aggregation on empty LocalRelation generated from a streaming source is not eliminated
      // as stateful streaming aggregation need to perform other state management operations other
      // than just processing the input data.
      case Aggregate(ge, _, _, _) if ge.nonEmpty && !p.isStreaming => empty(p)
      // Generators like Hive-style UDTF may return their records within `close`.
      case Generate(_: Explode, _, _, _, _, _) => empty(p)
      case Expand(_, _, _) => empty(p)
      case _: Window => empty(p)
      case _ => p
    }
  }