in spark/src/main/scala/org/apache/comet/rules/CometExecRule.scala [67:565]
private def isCometPlan(op: SparkPlan): Boolean = op.isInstanceOf[CometPlan]
private def isCometNative(op: SparkPlan): Boolean = op.isInstanceOf[CometNativeExec]
// spotless:off
/**
* Tries to transform a Spark physical plan into a Comet plan.
*
* This rule traverses bottom-up from the original Spark plan and for each plan node, there
* are a few cases to consider:
*
* 1. The child(ren) of the current node `p` cannot be converted to native
* In this case, we'll simply return the original Spark plan, since Comet native
* execution cannot start from an arbitrary Spark operator (unless it is special node
* such as scan or sink such as shuffle exchange, union etc., which are wrapped by
* `CometScanWrapper` and `CometSinkPlaceHolder` respectively).
*
* 2. The child(ren) of the current node `p` can be converted to native
* There are two sub-cases for this scenario: 1) This node `p` can also be converted to
* native. In this case, we'll create a new native Comet operator for `p` and connect it with
* its previously converted child(ren); 2) This node `p` cannot be converted to native. In
* this case, similar to 1) above, we simply return `p` as it is. Its child(ren) would still
* be native Comet operators.
*
* After this rule finishes, we'll do another pass on the final plan to convert all adjacent
* Comet native operators into a single native execution block. Please see where
* `convertBlock` is called below.
*
* Here are a few examples:
*
* Scan ======> CometScan
* | |
* Filter CometFilter
* | |
* HashAggregate CometHashAggregate
* | |
* Exchange CometExchange
* | |
* HashAggregate CometHashAggregate
* | |
* UnsupportedOperator UnsupportedOperator
*
* Native execution doesn't necessarily have to start from `CometScan`:
*
* Scan =======> CometScan
* | |
* UnsupportedOperator UnsupportedOperator
* | |
* HashAggregate HashAggregate
* | |
* Exchange CometExchange
* | |
* HashAggregate CometHashAggregate
* | |
* UnsupportedOperator UnsupportedOperator
*
* A sink can also be Comet operators other than `CometExchange`, for instance `CometUnion`:
*
* Scan Scan =======> CometScan CometScan
* | | | |
* Filter Filter CometFilter CometFilter
* | | | |
* Union CometUnion
* | |
* Project CometProject
*/
// spotless:on
private def transform(plan: SparkPlan): SparkPlan = {
def operator2Proto(op: SparkPlan): Option[Operator] = {
if (op.children.forall(_.isInstanceOf[CometNativeExec])) {
QueryPlanSerde.operator2Proto(
op,
op.children.map(_.asInstanceOf[CometNativeExec].nativeOp): _*)
} else {
None
}
}
/**
* Convert operator to proto and then apply a transformation to wrap the proto in a new plan.
*/
def newPlanWithProto(op: SparkPlan, fun: Operator => SparkPlan): SparkPlan = {
operator2Proto(op).map(fun).getOrElse(op)
}
plan.transformUp {
// Fully native scan for V1
case scan: CometScanExec
if COMET_NATIVE_SCAN_IMPL.get() == CometConf.SCAN_NATIVE_DATAFUSION =>
val nativeOp = QueryPlanSerde.operator2Proto(scan).get
CometNativeScanExec(nativeOp, scan.wrapped, scan.session)
// Comet JVM + native scan for V1 and V2
case op if isCometScan(op) =>
val nativeOp = QueryPlanSerde.operator2Proto(op)
CometScanWrapper(nativeOp.get, op)
case op if shouldApplySparkToColumnar(conf, op) =>
val cometOp = CometSparkToColumnarExec(op)
val nativeOp = QueryPlanSerde.operator2Proto(cometOp)
CometScanWrapper(nativeOp.get, cometOp)
case op: ProjectExec =>
newPlanWithProto(
op,
CometProjectExec(_, op, op.output, op.projectList, op.child, SerializedPlan(None)))
case op: FilterExec =>
newPlanWithProto(
op,
CometFilterExec(_, op, op.output, op.condition, op.child, SerializedPlan(None)))
case op: SortExec =>
newPlanWithProto(
op,
CometSortExec(
_,
op,
op.output,
op.outputOrdering,
op.sortOrder,
op.child,
SerializedPlan(None)))
case op: LocalLimitExec =>
newPlanWithProto(op, CometLocalLimitExec(_, op, op.limit, op.child, SerializedPlan(None)))
case op: GlobalLimitExec if op.offset == 0 =>
newPlanWithProto(
op,
CometGlobalLimitExec(_, op, op.limit, op.child, SerializedPlan(None)))
case op: CollectLimitExec
if isCometNative(op.child) && CometConf.COMET_EXEC_COLLECT_LIMIT_ENABLED.get(conf)
&& isCometShuffleEnabled(conf)
&& op.offset == 0 =>
QueryPlanSerde
.operator2Proto(op)
.map { nativeOp =>
val cometOp =
CometCollectLimitExec(op, op.limit, op.offset, op.child)
CometSinkPlaceHolder(nativeOp, op, cometOp)
}
.getOrElse(op)
case op: ExpandExec =>
newPlanWithProto(
op,
CometExpandExec(_, op, op.output, op.projections, op.child, SerializedPlan(None)))
// When Comet shuffle is disabled, we don't want to transform the HashAggregate
// to CometHashAggregate. Otherwise, we probably get partial Comet aggregation
// and final Spark aggregation.
case op: BaseAggregateExec
if op.isInstanceOf[HashAggregateExec] ||
op.isInstanceOf[ObjectHashAggregateExec] &&
isCometShuffleEnabled(conf) =>
val modes = op.aggregateExpressions.map(_.mode).distinct
// In distinct aggregates there can be a combination of modes
val multiMode = modes.size > 1
// For a final mode HashAggregate, we only need to transform the HashAggregate
// if there is Comet partial aggregation.
val sparkFinalMode = modes.contains(Final) && findCometPartialAgg(op.child).isEmpty
if (multiMode || sparkFinalMode) {
op
} else {
newPlanWithProto(
op,
nativeOp => {
// The aggExprs could be empty. For example, if the aggregate functions only have
// distinct aggregate functions or only have group by, the aggExprs is empty and
// modes is empty too. If aggExprs is not empty, we need to verify all the
// aggregates have the same mode.
assert(modes.length == 1 || modes.isEmpty)
CometHashAggregateExec(
nativeOp,
op,
op.output,
op.groupingExpressions,
op.aggregateExpressions,
op.resultExpressions,
op.child.output,
modes.headOption,
op.child,
SerializedPlan(None))
})
}
case op: ShuffledHashJoinExec
if CometConf.COMET_EXEC_HASH_JOIN_ENABLED.get(conf) &&
op.children.forall(isCometNative) =>
newPlanWithProto(
op,
CometHashJoinExec(
_,
op,
op.output,
op.outputOrdering,
op.leftKeys,
op.rightKeys,
op.joinType,
op.condition,
op.buildSide,
op.left,
op.right,
SerializedPlan(None)))
case op: ShuffledHashJoinExec if !CometConf.COMET_EXEC_HASH_JOIN_ENABLED.get(conf) =>
withInfo(op, "ShuffleHashJoin is not enabled")
case op: ShuffledHashJoinExec if !op.children.forall(isCometNative) =>
op
case op: BroadcastHashJoinExec
if CometConf.COMET_EXEC_BROADCAST_HASH_JOIN_ENABLED.get(conf) &&
op.children.forall(isCometNative) =>
newPlanWithProto(
op,
CometBroadcastHashJoinExec(
_,
op,
op.output,
op.outputOrdering,
op.leftKeys,
op.rightKeys,
op.joinType,
op.condition,
op.buildSide,
op.left,
op.right,
SerializedPlan(None)))
case op: SortMergeJoinExec
if CometConf.COMET_EXEC_SORT_MERGE_JOIN_ENABLED.get(conf) &&
op.children.forall(isCometNative) =>
newPlanWithProto(
op,
CometSortMergeJoinExec(
_,
op,
op.output,
op.outputOrdering,
op.leftKeys,
op.rightKeys,
op.joinType,
op.condition,
op.left,
op.right,
SerializedPlan(None)))
case op: SortMergeJoinExec
if CometConf.COMET_EXEC_SORT_MERGE_JOIN_ENABLED.get(conf) &&
!op.children.forall(isCometNative) =>
op
case op: SortMergeJoinExec if !CometConf.COMET_EXEC_SORT_MERGE_JOIN_ENABLED.get(conf) =>
withInfo(op, "SortMergeJoin is not enabled")
case op: SortMergeJoinExec if !op.children.forall(isCometNative) =>
op
case c @ CoalesceExec(numPartitions, child)
if CometConf.COMET_EXEC_COALESCE_ENABLED.get(conf)
&& isCometNative(child) =>
QueryPlanSerde
.operator2Proto(c)
.map { nativeOp =>
val cometOp = CometCoalesceExec(c, c.output, numPartitions, child)
CometSinkPlaceHolder(nativeOp, c, cometOp)
}
.getOrElse(c)
case c @ CoalesceExec(_, _) if !CometConf.COMET_EXEC_COALESCE_ENABLED.get(conf) =>
withInfo(c, "Coalesce is not enabled")
case op: CoalesceExec if !op.children.forall(isCometNative) =>
op
case s: TakeOrderedAndProjectExec
if isCometNative(s.child) && CometConf.COMET_EXEC_TAKE_ORDERED_AND_PROJECT_ENABLED
.get(conf)
&& isCometShuffleEnabled(conf) &&
CometTakeOrderedAndProjectExec.isSupported(s) =>
QueryPlanSerde
.operator2Proto(s)
.map { nativeOp =>
val cometOp =
CometTakeOrderedAndProjectExec(
s,
s.output,
s.limit,
s.sortOrder,
s.projectList,
s.child)
CometSinkPlaceHolder(nativeOp, s, cometOp)
}
.getOrElse(s)
case s: TakeOrderedAndProjectExec =>
val info1 = createMessage(
!CometConf.COMET_EXEC_TAKE_ORDERED_AND_PROJECT_ENABLED.get(conf),
"TakeOrderedAndProject is not enabled")
val info2 = createMessage(
!isCometShuffleEnabled(conf),
"TakeOrderedAndProject requires shuffle to be enabled")
withInfo(s, Seq(info1, info2).flatten.mkString(","))
case w: WindowExec =>
newPlanWithProto(
w,
CometWindowExec(
_,
w,
w.output,
w.windowExpression,
w.partitionSpec,
w.orderSpec,
w.child,
SerializedPlan(None)))
case u: UnionExec
if CometConf.COMET_EXEC_UNION_ENABLED.get(conf) &&
u.children.forall(isCometNative) =>
newPlanWithProto(
u, {
val cometOp = CometUnionExec(u, u.output, u.children)
CometSinkPlaceHolder(_, u, cometOp)
})
case u: UnionExec if !CometConf.COMET_EXEC_UNION_ENABLED.get(conf) =>
withInfo(u, "Union is not enabled")
case op: UnionExec if !op.children.forall(isCometNative) =>
op
// For AQE broadcast stage on a Comet broadcast exchange
case s @ BroadcastQueryStageExec(_, _: CometBroadcastExchangeExec, _) =>
newPlanWithProto(s, CometSinkPlaceHolder(_, s, s))
case s @ BroadcastQueryStageExec(
_,
ReusedExchangeExec(_, _: CometBroadcastExchangeExec),
_) =>
newPlanWithProto(s, CometSinkPlaceHolder(_, s, s))
// `CometBroadcastExchangeExec`'s broadcast output is not compatible with Spark's broadcast
// exchange. It is only used for Comet native execution. We only transform Spark broadcast
// exchange to Comet broadcast exchange if its downstream is a Comet native plan or if the
// broadcast exchange is forced to be enabled by Comet config.
case plan if plan.children.exists(_.isInstanceOf[BroadcastExchangeExec]) =>
val newChildren = plan.children.map {
case b: BroadcastExchangeExec
if isCometNative(b.child) &&
CometConf.COMET_EXEC_BROADCAST_EXCHANGE_ENABLED.get(conf) =>
QueryPlanSerde.operator2Proto(b) match {
case Some(nativeOp) =>
val cometOp = CometBroadcastExchangeExec(b, b.output, b.mode, b.child)
CometSinkPlaceHolder(nativeOp, b, cometOp)
case None => b
}
case other => other
}
if (!newChildren.exists(_.isInstanceOf[BroadcastExchangeExec])) {
val newPlan = apply(plan.withNewChildren(newChildren))
if (isCometNative(newPlan) || isCometBroadCastForceEnabled(conf)) {
newPlan
} else {
if (isCometNative(newPlan)) {
val reason =
getCometBroadcastNotEnabledReason(conf).getOrElse("no reason available")
withInfo(plan, s"Broadcast is not enabled: $reason")
}
plan
}
} else {
plan
}
// this case should be checked only after the previous case checking for a
// child BroadcastExchange has been applied, otherwise that transform
// never gets applied
case op: BroadcastHashJoinExec if !op.children.forall(isCometNative) =>
op
case op: BroadcastHashJoinExec
if !CometConf.COMET_EXEC_BROADCAST_HASH_JOIN_ENABLED.get(conf) =>
withInfo(op, "BroadcastHashJoin is not enabled")
// For AQE shuffle stage on a Comet shuffle exchange
case s @ ShuffleQueryStageExec(_, _: CometShuffleExchangeExec, _) =>
newPlanWithProto(s, CometSinkPlaceHolder(_, s, s))
// For AQE shuffle stage on a reused Comet shuffle exchange
// Note that we don't need to handle `ReusedExchangeExec` for non-AQE case, because
// the query plan won't be re-optimized/planned in non-AQE mode.
case s @ ShuffleQueryStageExec(_, ReusedExchangeExec(_, _: CometShuffleExchangeExec), _) =>
newPlanWithProto(s, CometSinkPlaceHolder(_, s, s))
// Native shuffle for Comet operators
case s: ShuffleExchangeExec =>
val nativePrecondition = isCometShuffleEnabled(conf) &&
isCometNativeShuffleMode(conf) &&
QueryPlanSerde.nativeShuffleSupported(s)._1
val nativeShuffle: Option[SparkPlan] =
if (nativePrecondition) {
val newOp = operator2Proto(s)
newOp match {
case Some(nativeOp) =>
// Switch to use Decimal128 regardless of precision, since Arrow native execution
// doesn't support Decimal32 and Decimal64 yet.
conf.setConfString(CometConf.COMET_USE_DECIMAL_128.key, "true")
val cometOp = CometShuffleExchangeExec(s, shuffleType = CometNativeShuffle)
Some(CometSinkPlaceHolder(nativeOp, s, cometOp))
case None =>
None
}
} else {
None
}
// this is a temporary workaround because some Spark SQL tests fail
// when we enable COMET_SHUFFLE_FALLBACK_TO_COLUMNAR due to valid bugs
// that we had not previously seen
val tryColumnarNext =
!nativePrecondition || (nativePrecondition && nativeShuffle.isEmpty &&
COMET_SHUFFLE_FALLBACK_TO_COLUMNAR.get(conf))
val nativeOrColumnarShuffle = if (nativeShuffle.isDefined) {
nativeShuffle
} else if (tryColumnarNext) {
// Columnar shuffle for regular Spark operators (not Comet) and Comet operators
// (if configured).
// If the child of ShuffleExchangeExec is also a ShuffleExchangeExec, we should not
// convert it to CometColumnarShuffle,
if (isCometShuffleEnabled(conf) && isCometJVMShuffleMode(conf) &&
QueryPlanSerde.columnarShuffleSupported(s)._1 &&
!isShuffleOperator(s.child)) {
val newOp = QueryPlanSerde.operator2Proto(s)
newOp match {
case Some(nativeOp) =>
s.child match {
case n if n.isInstanceOf[CometNativeExec] || !n.supportsColumnar =>
val cometOp =
CometShuffleExchangeExec(s, shuffleType = CometColumnarShuffle)
Some(CometSinkPlaceHolder(nativeOp, s, cometOp))
case _ =>
None
}
case None =>
None
}
} else {
None
}
} else {
None
}
if (nativeOrColumnarShuffle.isDefined) {
nativeOrColumnarShuffle.get
} else {
val isShuffleEnabled = isCometShuffleEnabled(conf)
val reason = getCometShuffleNotEnabledReason(conf).getOrElse("no reason available")
val msg1 = createMessage(!isShuffleEnabled, s"Comet shuffle is not enabled: $reason")
val columnarShuffleEnabled = isCometJVMShuffleMode(conf)
val msg2 = createMessage(
isShuffleEnabled && !columnarShuffleEnabled && !QueryPlanSerde
.nativeShuffleSupported(s)
._1,
"Native shuffle: " +
s"${QueryPlanSerde.nativeShuffleSupported(s)._2}")
val typeInfo = QueryPlanSerde
.columnarShuffleSupported(s)
._2
val msg3 = createMessage(
isShuffleEnabled && columnarShuffleEnabled && !QueryPlanSerde
.columnarShuffleSupported(s)
._1,
"JVM shuffle: " +
s"$typeInfo")
withInfo(s, Seq(msg1, msg2, msg3).flatten.mkString(","))
}
case op =>
op match {
case _: CometExec | _: AQEShuffleReadExec | _: BroadcastExchangeExec |
_: CometBroadcastExchangeExec | _: CometShuffleExchangeExec =>
// Some execs should never be replaced. We include
// these cases specially here so we do not add a misleading 'info' message
op
case _ =>
// An operator that is not supported by Comet
withInfo(op, s"${op.nodeName} is not supported")
}
}
}