in spark/src/main/scala/org/apache/comet/rules/CometExecRule.scala [619:730]
override def apply(plan: SparkPlan): SparkPlan = {
// DataFusion doesn't have ANSI mode. For now we just disable CometExec if ANSI mode is
// enabled.
if (isANSIEnabled(conf)) {
if (COMET_ANSI_MODE_ENABLED.get()) {
if (!isSpark40Plus) {
logWarning("Using Comet's experimental support for ANSI mode.")
}
} else {
logInfo("Comet extension disabled for ANSI mode")
return plan
}
}
// We shouldn't transform Spark query plan if Comet is not loaded.
if (!isCometLoaded(conf)) return plan
if (!isCometExecEnabled(conf)) {
// Comet exec is disabled, but for Spark shuffle, we still can use Comet columnar shuffle
if (isCometShuffleEnabled(conf)) {
applyCometShuffle(plan)
} else {
plan
}
} else {
val normalizedPlan = if (CometConf.COMET_REPLACE_SMJ.get()) {
normalizePlan(plan).transformUp { case p =>
RewriteJoin.rewrite(p)
}
} else {
normalizePlan(plan)
}
var newPlan = transform(normalizedPlan)
// if the plan cannot be run fully natively then explain why (when appropriate
// config is enabled)
if (CometConf.COMET_EXPLAIN_FALLBACK_ENABLED.get()) {
val info = new ExtendedExplainInfo()
if (info.extensionInfo(newPlan).nonEmpty) {
logWarning(
"Comet cannot execute some parts of this plan natively " +
s"(set ${CometConf.COMET_EXPLAIN_FALLBACK_ENABLED.key}=false " +
"to disable this logging):\n" +
s"${info.generateVerboseExtendedInfo(newPlan)}")
}
}
// Remove placeholders
newPlan = newPlan.transform {
case CometSinkPlaceHolder(_, _, s) => s
case CometScanWrapper(_, s) => s
}
// Set up logical links
newPlan = newPlan.transform {
case op: CometExec =>
if (op.originalPlan.logicalLink.isEmpty) {
op.unsetTagValue(SparkPlan.LOGICAL_PLAN_TAG)
op.unsetTagValue(SparkPlan.LOGICAL_PLAN_INHERITED_TAG)
} else {
op.originalPlan.logicalLink.foreach(op.setLogicalLink)
}
op
case op: CometShuffleExchangeExec =>
// Original Spark shuffle exchange operator might have empty logical link.
// But the `setLogicalLink` call above on downstream operator of
// `CometShuffleExchangeExec` will set its logical link to the downstream
// operators which cause AQE behavior to be incorrect. So we need to unset
// the logical link here.
if (op.originalPlan.logicalLink.isEmpty) {
op.unsetTagValue(SparkPlan.LOGICAL_PLAN_TAG)
op.unsetTagValue(SparkPlan.LOGICAL_PLAN_INHERITED_TAG)
} else {
op.originalPlan.logicalLink.foreach(op.setLogicalLink)
}
op
case op: CometBroadcastExchangeExec =>
if (op.originalPlan.logicalLink.isEmpty) {
op.unsetTagValue(SparkPlan.LOGICAL_PLAN_TAG)
op.unsetTagValue(SparkPlan.LOGICAL_PLAN_INHERITED_TAG)
} else {
op.originalPlan.logicalLink.foreach(op.setLogicalLink)
}
op
}
// Convert native execution block by linking consecutive native operators.
var firstNativeOp = true
newPlan.transformDown {
case op: CometNativeExec =>
val newPlan = if (firstNativeOp) {
firstNativeOp = false
op.convertBlock()
} else {
op
}
// If reaching leaf node, reset `firstNativeOp` to true
// because it will start a new block in next iteration.
if (op.children.isEmpty) {
firstNativeOp = true
}
newPlan
case op =>
firstNativeOp = true
op
}
}
}