in sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/PropagateEmptyRelation.scala [68:179]
protected def canExecuteWithoutJoin(plan: LogicalPlan): Boolean = true
protected def commonApplyFunc: PartialFunction[LogicalPlan, LogicalPlan] = {
case p: Union if p.children.exists(isEmpty) =>
val newChildren = p.children.filterNot(isEmpty)
if (newChildren.isEmpty) {
empty(p)
} else {
val newPlan = if (newChildren.size > 1) Union(newChildren) else newChildren.head
val outputs = newPlan.output.zip(p.output)
// the original Union may produce different output attributes than the new one so we alias
// them if needed
if (outputs.forall { case (newAttr, oldAttr) => newAttr.exprId == oldAttr.exprId }) {
newPlan
} else {
val newOutput = outputs.map { case (newAttr, oldAttr) =>
if (newAttr.exprId == oldAttr.exprId) {
newAttr
} else {
val newExplicitMetadata =
if (oldAttr.metadata != newAttr.metadata) Some(oldAttr.metadata) else None
Alias(newAttr, oldAttr.name)(oldAttr.exprId, explicitMetadata = newExplicitMetadata)
}
}
Project(newOutput, newPlan)
}
}
// Joins on empty LocalRelations generated from streaming sources are not eliminated
// as stateful streaming joins need to perform other state management operations other than
// just processing the input data.
case p @ Join(_, _, joinType, conditionOpt, _)
if !p.children.exists(_.isStreaming) =>
val isLeftEmpty = isEmpty(p.left)
val isRightEmpty = isEmpty(p.right)
val isFalseCondition = conditionOpt match {
case Some(FalseLiteral) => true
case _ => false
}
if (isLeftEmpty || isRightEmpty || isFalseCondition) {
joinType match {
case _: InnerLike => empty(p)
// Intersect is handled as LeftSemi by `ReplaceIntersectWithSemiJoin` rule.
// Except is handled as LeftAnti by `ReplaceExceptWithAntiJoin` rule.
case LeftOuter | LeftSemi | LeftAnti if isLeftEmpty => empty(p)
case LeftSemi if isRightEmpty | isFalseCondition => empty(p)
case LeftAnti if (isRightEmpty | isFalseCondition) && canExecuteWithoutJoin(p.left) =>
p.left
case FullOuter if isLeftEmpty && isRightEmpty => empty(p)
case LeftOuter | FullOuter if isRightEmpty && canExecuteWithoutJoin(p.left) =>
Project(p.left.output ++ nullValueProjectList(p.right), p.left)
case RightOuter if isRightEmpty => empty(p)
case RightOuter | FullOuter if isLeftEmpty && canExecuteWithoutJoin(p.right) =>
Project(nullValueProjectList(p.left) ++ p.right.output, p.right)
case LeftOuter if isFalseCondition && canExecuteWithoutJoin(p.left) =>
Project(p.left.output ++ nullValueProjectList(p.right), p.left)
case RightOuter if isFalseCondition && canExecuteWithoutJoin(p.right) =>
Project(nullValueProjectList(p.left) ++ p.right.output, p.right)
case _ => p
}
} else if (joinType == LeftSemi && conditionOpt.isEmpty &&
nonEmpty(p.right) && canExecuteWithoutJoin(p.left)) {
p.left
} else if (joinType == LeftAnti && conditionOpt.isEmpty && nonEmpty(p.right)) {
empty(p)
} else {
p
}
// Only replace a query stage if it would lead to a reduction of operators. !p.isDirectStage
// means the physical node it contains is partial aggregate instead of QueryStageExec, which
// is exactly what we want to propagate empty relation.
case p: LogicalQueryStage if isEmpty(p) && !p.isDirectStage => empty(p)
case p: UnaryNode if p.children.nonEmpty && p.children.forall(isEmpty) => p match {
case _: Project => empty(p)
case _: Filter => empty(p)
case _: Sample => empty(p)
case _: Sort => empty(p)
case _: GlobalLimit if !p.isStreaming => empty(p)
case _: LocalLimit if !p.isStreaming => empty(p)
case _: Offset => empty(p)
case _: RepartitionOperation =>
if (p.getTagValue(ROOT_REPARTITION).isEmpty) {
empty(p)
} else {
p.unsetTagValue(ROOT_REPARTITION)
p
}
case _: RebalancePartitions => empty(p)
// An aggregate with non-empty group expression will return one output row per group when the
// input to the aggregate is not empty. If the input to the aggregate is empty then all groups
// will be empty and thus the output will be empty. If we're working on batch data, we can
// then treat the aggregate as redundant.
//
// If the aggregate is over streaming data, we may need to update the state store even if no
// new rows are processed, so we can't eliminate the node.
//
// If the grouping expressions are empty, however, then the aggregate will always produce a
// single output row and thus we cannot propagate the EmptyRelation.
//
// Aggregation on empty LocalRelation generated from a streaming source is not eliminated
// as stateful streaming aggregation need to perform other state management operations other
// than just processing the input data.
case Aggregate(ge, _, _, _) if ge.nonEmpty && !p.isStreaming => empty(p)
// Generators like Hive-style UDTF may return their records within `close`.
case Generate(_: Explode, _, _, _, _, _) => empty(p)
case Expand(_, _, _) => empty(p)
case _: Window => empty(p)
case _ => p
}
}