private def isCometPlan()

in spark/src/main/scala/org/apache/comet/rules/CometExecRule.scala [67:565]


  private def isCometPlan(op: SparkPlan): Boolean = op.isInstanceOf[CometPlan]

  private def isCometNative(op: SparkPlan): Boolean = op.isInstanceOf[CometNativeExec]

  // spotless:off
  /**
   * Tries to transform a Spark physical plan into a Comet plan.
   *
   * This rule traverses bottom-up from the original Spark plan and for each plan node, there
   * are a few cases to consider:
   *
   * 1. The child(ren) of the current node `p` cannot be converted to native
   *   In this case, we'll simply return the original Spark plan, since Comet native
   *   execution cannot start from an arbitrary Spark operator (unless it is special node
   *   such as scan or sink such as shuffle exchange, union etc., which are wrapped by
   *   `CometScanWrapper` and `CometSinkPlaceHolder` respectively).
   *
   * 2. The child(ren) of the current node `p` can be converted to native
   *   There are two sub-cases for this scenario: 1) This node `p` can also be converted to
   *   native. In this case, we'll create a new native Comet operator for `p` and connect it with
   *   its previously converted child(ren); 2) This node `p` cannot be converted to native. In
   *   this case, similar to 1) above, we simply return `p` as it is. Its child(ren) would still
   *   be native Comet operators.
   *
   * After this rule finishes, we'll do another pass on the final plan to convert all adjacent
   * Comet native operators into a single native execution block. Please see where
   * `convertBlock` is called below.
   *
   * Here are a few examples:
   *
   *     Scan                       ======>             CometScan
   *      |                                                |
   *     Filter                                         CometFilter
   *      |                                                |
   *     HashAggregate                                  CometHashAggregate
   *      |                                                |
   *     Exchange                                       CometExchange
   *      |                                                |
   *     HashAggregate                                  CometHashAggregate
   *      |                                                |
   *     UnsupportedOperator                            UnsupportedOperator
   *
   * Native execution doesn't necessarily have to start from `CometScan`:
   *
   *     Scan                       =======>            CometScan
   *      |                                                |
   *     UnsupportedOperator                            UnsupportedOperator
   *      |                                                |
   *     HashAggregate                                  HashAggregate
   *      |                                                |
   *     Exchange                                       CometExchange
   *      |                                                |
   *     HashAggregate                                  CometHashAggregate
   *      |                                                |
   *     UnsupportedOperator                            UnsupportedOperator
   *
   * A sink can also be Comet operators other than `CometExchange`, for instance `CometUnion`:
   *
   *     Scan   Scan                =======>          CometScan CometScan
   *      |      |                                       |         |
   *     Filter Filter                                CometFilter CometFilter
   *      |      |                                       |         |
   *        Union                                         CometUnion
   *          |                                               |
   *        Project                                       CometProject
   */
  // spotless:on
  private def transform(plan: SparkPlan): SparkPlan = {
    def operator2Proto(op: SparkPlan): Option[Operator] = {
      if (op.children.forall(_.isInstanceOf[CometNativeExec])) {
        QueryPlanSerde.operator2Proto(
          op,
          op.children.map(_.asInstanceOf[CometNativeExec].nativeOp): _*)
      } else {
        None
      }
    }

    /**
     * Convert operator to proto and then apply a transformation to wrap the proto in a new plan.
     */
    def newPlanWithProto(op: SparkPlan, fun: Operator => SparkPlan): SparkPlan = {
      operator2Proto(op).map(fun).getOrElse(op)
    }

    plan.transformUp {
      // Fully native scan for V1
      case scan: CometScanExec
          if COMET_NATIVE_SCAN_IMPL.get() == CometConf.SCAN_NATIVE_DATAFUSION =>
        val nativeOp = QueryPlanSerde.operator2Proto(scan).get
        CometNativeScanExec(nativeOp, scan.wrapped, scan.session)

      // Comet JVM + native scan for V1 and V2
      case op if isCometScan(op) =>
        val nativeOp = QueryPlanSerde.operator2Proto(op)
        CometScanWrapper(nativeOp.get, op)

      case op if shouldApplySparkToColumnar(conf, op) =>
        val cometOp = CometSparkToColumnarExec(op)
        val nativeOp = QueryPlanSerde.operator2Proto(cometOp)
        CometScanWrapper(nativeOp.get, cometOp)

      case op: ProjectExec =>
        newPlanWithProto(
          op,
          CometProjectExec(_, op, op.output, op.projectList, op.child, SerializedPlan(None)))

      case op: FilterExec =>
        newPlanWithProto(
          op,
          CometFilterExec(_, op, op.output, op.condition, op.child, SerializedPlan(None)))

      case op: SortExec =>
        newPlanWithProto(
          op,
          CometSortExec(
            _,
            op,
            op.output,
            op.outputOrdering,
            op.sortOrder,
            op.child,
            SerializedPlan(None)))

      case op: LocalLimitExec =>
        newPlanWithProto(op, CometLocalLimitExec(_, op, op.limit, op.child, SerializedPlan(None)))

      case op: GlobalLimitExec if op.offset == 0 =>
        newPlanWithProto(
          op,
          CometGlobalLimitExec(_, op, op.limit, op.child, SerializedPlan(None)))

      case op: CollectLimitExec
          if isCometNative(op.child) && CometConf.COMET_EXEC_COLLECT_LIMIT_ENABLED.get(conf)
            && isCometShuffleEnabled(conf)
            && op.offset == 0 =>
        QueryPlanSerde
          .operator2Proto(op)
          .map { nativeOp =>
            val cometOp =
              CometCollectLimitExec(op, op.limit, op.offset, op.child)
            CometSinkPlaceHolder(nativeOp, op, cometOp)
          }
          .getOrElse(op)

      case op: ExpandExec =>
        newPlanWithProto(
          op,
          CometExpandExec(_, op, op.output, op.projections, op.child, SerializedPlan(None)))

      // When Comet shuffle is disabled, we don't want to transform the HashAggregate
      // to CometHashAggregate. Otherwise, we probably get partial Comet aggregation
      // and final Spark aggregation.
      case op: BaseAggregateExec
          if op.isInstanceOf[HashAggregateExec] ||
            op.isInstanceOf[ObjectHashAggregateExec] &&
            isCometShuffleEnabled(conf) =>
        val modes = op.aggregateExpressions.map(_.mode).distinct
        // In distinct aggregates there can be a combination of modes
        val multiMode = modes.size > 1
        // For a final mode HashAggregate, we only need to transform the HashAggregate
        // if there is Comet partial aggregation.
        val sparkFinalMode = modes.contains(Final) && findCometPartialAgg(op.child).isEmpty

        if (multiMode || sparkFinalMode) {
          op
        } else {
          newPlanWithProto(
            op,
            nativeOp => {
              // The aggExprs could be empty. For example, if the aggregate functions only have
              // distinct aggregate functions or only have group by, the aggExprs is empty and
              // modes is empty too. If aggExprs is not empty, we need to verify all the
              // aggregates have the same mode.
              assert(modes.length == 1 || modes.isEmpty)
              CometHashAggregateExec(
                nativeOp,
                op,
                op.output,
                op.groupingExpressions,
                op.aggregateExpressions,
                op.resultExpressions,
                op.child.output,
                modes.headOption,
                op.child,
                SerializedPlan(None))
            })
        }

      case op: ShuffledHashJoinExec
          if CometConf.COMET_EXEC_HASH_JOIN_ENABLED.get(conf) &&
            op.children.forall(isCometNative) =>
        newPlanWithProto(
          op,
          CometHashJoinExec(
            _,
            op,
            op.output,
            op.outputOrdering,
            op.leftKeys,
            op.rightKeys,
            op.joinType,
            op.condition,
            op.buildSide,
            op.left,
            op.right,
            SerializedPlan(None)))

      case op: ShuffledHashJoinExec if !CometConf.COMET_EXEC_HASH_JOIN_ENABLED.get(conf) =>
        withInfo(op, "ShuffleHashJoin is not enabled")

      case op: ShuffledHashJoinExec if !op.children.forall(isCometNative) =>
        op

      case op: BroadcastHashJoinExec
          if CometConf.COMET_EXEC_BROADCAST_HASH_JOIN_ENABLED.get(conf) &&
            op.children.forall(isCometNative) =>
        newPlanWithProto(
          op,
          CometBroadcastHashJoinExec(
            _,
            op,
            op.output,
            op.outputOrdering,
            op.leftKeys,
            op.rightKeys,
            op.joinType,
            op.condition,
            op.buildSide,
            op.left,
            op.right,
            SerializedPlan(None)))

      case op: SortMergeJoinExec
          if CometConf.COMET_EXEC_SORT_MERGE_JOIN_ENABLED.get(conf) &&
            op.children.forall(isCometNative) =>
        newPlanWithProto(
          op,
          CometSortMergeJoinExec(
            _,
            op,
            op.output,
            op.outputOrdering,
            op.leftKeys,
            op.rightKeys,
            op.joinType,
            op.condition,
            op.left,
            op.right,
            SerializedPlan(None)))

      case op: SortMergeJoinExec
          if CometConf.COMET_EXEC_SORT_MERGE_JOIN_ENABLED.get(conf) &&
            !op.children.forall(isCometNative) =>
        op

      case op: SortMergeJoinExec if !CometConf.COMET_EXEC_SORT_MERGE_JOIN_ENABLED.get(conf) =>
        withInfo(op, "SortMergeJoin is not enabled")

      case op: SortMergeJoinExec if !op.children.forall(isCometNative) =>
        op

      case c @ CoalesceExec(numPartitions, child)
          if CometConf.COMET_EXEC_COALESCE_ENABLED.get(conf)
            && isCometNative(child) =>
        QueryPlanSerde
          .operator2Proto(c)
          .map { nativeOp =>
            val cometOp = CometCoalesceExec(c, c.output, numPartitions, child)
            CometSinkPlaceHolder(nativeOp, c, cometOp)
          }
          .getOrElse(c)

      case c @ CoalesceExec(_, _) if !CometConf.COMET_EXEC_COALESCE_ENABLED.get(conf) =>
        withInfo(c, "Coalesce is not enabled")

      case op: CoalesceExec if !op.children.forall(isCometNative) =>
        op

      case s: TakeOrderedAndProjectExec
          if isCometNative(s.child) && CometConf.COMET_EXEC_TAKE_ORDERED_AND_PROJECT_ENABLED
            .get(conf)
            && isCometShuffleEnabled(conf) &&
            CometTakeOrderedAndProjectExec.isSupported(s) =>
        QueryPlanSerde
          .operator2Proto(s)
          .map { nativeOp =>
            val cometOp =
              CometTakeOrderedAndProjectExec(
                s,
                s.output,
                s.limit,
                s.sortOrder,
                s.projectList,
                s.child)
            CometSinkPlaceHolder(nativeOp, s, cometOp)
          }
          .getOrElse(s)

      case s: TakeOrderedAndProjectExec =>
        val info1 = createMessage(
          !CometConf.COMET_EXEC_TAKE_ORDERED_AND_PROJECT_ENABLED.get(conf),
          "TakeOrderedAndProject is not enabled")
        val info2 = createMessage(
          !isCometShuffleEnabled(conf),
          "TakeOrderedAndProject requires shuffle to be enabled")
        withInfo(s, Seq(info1, info2).flatten.mkString(","))

      case w: WindowExec =>
        newPlanWithProto(
          w,
          CometWindowExec(
            _,
            w,
            w.output,
            w.windowExpression,
            w.partitionSpec,
            w.orderSpec,
            w.child,
            SerializedPlan(None)))

      case u: UnionExec
          if CometConf.COMET_EXEC_UNION_ENABLED.get(conf) &&
            u.children.forall(isCometNative) =>
        newPlanWithProto(
          u, {
            val cometOp = CometUnionExec(u, u.output, u.children)
            CometSinkPlaceHolder(_, u, cometOp)
          })

      case u: UnionExec if !CometConf.COMET_EXEC_UNION_ENABLED.get(conf) =>
        withInfo(u, "Union is not enabled")

      case op: UnionExec if !op.children.forall(isCometNative) =>
        op

      // For AQE broadcast stage on a Comet broadcast exchange
      case s @ BroadcastQueryStageExec(_, _: CometBroadcastExchangeExec, _) =>
        newPlanWithProto(s, CometSinkPlaceHolder(_, s, s))

      case s @ BroadcastQueryStageExec(
            _,
            ReusedExchangeExec(_, _: CometBroadcastExchangeExec),
            _) =>
        newPlanWithProto(s, CometSinkPlaceHolder(_, s, s))

      // `CometBroadcastExchangeExec`'s broadcast output is not compatible with Spark's broadcast
      // exchange. It is only used for Comet native execution. We only transform Spark broadcast
      // exchange to Comet broadcast exchange if its downstream is a Comet native plan or if the
      // broadcast exchange is forced to be enabled by Comet config.
      case plan if plan.children.exists(_.isInstanceOf[BroadcastExchangeExec]) =>
        val newChildren = plan.children.map {
          case b: BroadcastExchangeExec
              if isCometNative(b.child) &&
                CometConf.COMET_EXEC_BROADCAST_EXCHANGE_ENABLED.get(conf) =>
            QueryPlanSerde.operator2Proto(b) match {
              case Some(nativeOp) =>
                val cometOp = CometBroadcastExchangeExec(b, b.output, b.mode, b.child)
                CometSinkPlaceHolder(nativeOp, b, cometOp)
              case None => b
            }
          case other => other
        }
        if (!newChildren.exists(_.isInstanceOf[BroadcastExchangeExec])) {
          val newPlan = apply(plan.withNewChildren(newChildren))
          if (isCometNative(newPlan) || isCometBroadCastForceEnabled(conf)) {
            newPlan
          } else {
            if (isCometNative(newPlan)) {
              val reason =
                getCometBroadcastNotEnabledReason(conf).getOrElse("no reason available")
              withInfo(plan, s"Broadcast is not enabled: $reason")
            }
            plan
          }
        } else {
          plan
        }

      // this case should be checked only after the previous case checking for a
      // child BroadcastExchange has been applied, otherwise that transform
      // never gets applied
      case op: BroadcastHashJoinExec if !op.children.forall(isCometNative) =>
        op

      case op: BroadcastHashJoinExec
          if !CometConf.COMET_EXEC_BROADCAST_HASH_JOIN_ENABLED.get(conf) =>
        withInfo(op, "BroadcastHashJoin is not enabled")

      // For AQE shuffle stage on a Comet shuffle exchange
      case s @ ShuffleQueryStageExec(_, _: CometShuffleExchangeExec, _) =>
        newPlanWithProto(s, CometSinkPlaceHolder(_, s, s))

      // For AQE shuffle stage on a reused Comet shuffle exchange
      // Note that we don't need to handle `ReusedExchangeExec` for non-AQE case, because
      // the query plan won't be re-optimized/planned in non-AQE mode.
      case s @ ShuffleQueryStageExec(_, ReusedExchangeExec(_, _: CometShuffleExchangeExec), _) =>
        newPlanWithProto(s, CometSinkPlaceHolder(_, s, s))

      // Native shuffle for Comet operators
      case s: ShuffleExchangeExec =>
        val nativePrecondition = isCometShuffleEnabled(conf) &&
          isCometNativeShuffleMode(conf) &&
          QueryPlanSerde.nativeShuffleSupported(s)._1

        val nativeShuffle: Option[SparkPlan] =
          if (nativePrecondition) {
            val newOp = operator2Proto(s)
            newOp match {
              case Some(nativeOp) =>
                // Switch to use Decimal128 regardless of precision, since Arrow native execution
                // doesn't support Decimal32 and Decimal64 yet.
                conf.setConfString(CometConf.COMET_USE_DECIMAL_128.key, "true")
                val cometOp = CometShuffleExchangeExec(s, shuffleType = CometNativeShuffle)
                Some(CometSinkPlaceHolder(nativeOp, s, cometOp))
              case None =>
                None
            }
          } else {
            None
          }

        // this is a temporary workaround because some Spark SQL tests fail
        // when we enable COMET_SHUFFLE_FALLBACK_TO_COLUMNAR due to valid bugs
        // that we had not previously seen
        val tryColumnarNext =
          !nativePrecondition || (nativePrecondition && nativeShuffle.isEmpty &&
            COMET_SHUFFLE_FALLBACK_TO_COLUMNAR.get(conf))

        val nativeOrColumnarShuffle = if (nativeShuffle.isDefined) {
          nativeShuffle
        } else if (tryColumnarNext) {
          // Columnar shuffle for regular Spark operators (not Comet) and Comet operators
          // (if configured).
          // If the child of ShuffleExchangeExec is also a ShuffleExchangeExec, we should not
          // convert it to CometColumnarShuffle,
          if (isCometShuffleEnabled(conf) && isCometJVMShuffleMode(conf) &&
            QueryPlanSerde.columnarShuffleSupported(s)._1 &&
            !isShuffleOperator(s.child)) {

            val newOp = QueryPlanSerde.operator2Proto(s)
            newOp match {
              case Some(nativeOp) =>
                s.child match {
                  case n if n.isInstanceOf[CometNativeExec] || !n.supportsColumnar =>
                    val cometOp =
                      CometShuffleExchangeExec(s, shuffleType = CometColumnarShuffle)
                    Some(CometSinkPlaceHolder(nativeOp, s, cometOp))
                  case _ =>
                    None
                }
              case None =>
                None
            }
          } else {
            None
          }
        } else {
          None
        }

        if (nativeOrColumnarShuffle.isDefined) {
          nativeOrColumnarShuffle.get
        } else {
          val isShuffleEnabled = isCometShuffleEnabled(conf)
          val reason = getCometShuffleNotEnabledReason(conf).getOrElse("no reason available")
          val msg1 = createMessage(!isShuffleEnabled, s"Comet shuffle is not enabled: $reason")
          val columnarShuffleEnabled = isCometJVMShuffleMode(conf)
          val msg2 = createMessage(
            isShuffleEnabled && !columnarShuffleEnabled && !QueryPlanSerde
              .nativeShuffleSupported(s)
              ._1,
            "Native shuffle: " +
              s"${QueryPlanSerde.nativeShuffleSupported(s)._2}")
          val typeInfo = QueryPlanSerde
            .columnarShuffleSupported(s)
            ._2
          val msg3 = createMessage(
            isShuffleEnabled && columnarShuffleEnabled && !QueryPlanSerde
              .columnarShuffleSupported(s)
              ._1,
            "JVM shuffle: " +
              s"$typeInfo")
          withInfo(s, Seq(msg1, msg2, msg3).flatten.mkString(","))
        }

      case op =>
        op match {
          case _: CometExec | _: AQEShuffleReadExec | _: BroadcastExchangeExec |
              _: CometBroadcastExchangeExec | _: CometShuffleExchangeExec =>
            // Some execs should never be replaced. We include
            // these cases specially here so we do not add a misleading 'info' message
            op
          case _ =>
            // An operator that is not supported by Comet
            withInfo(op, s"${op.nodeName} is not supported")
        }
    }
  }