def apply()

in sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala [879:1082]


    def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
      case d: DataWritingCommand => DataWritingCommandExec(d, planLater(d.query)) :: Nil
      case r: RunnableCommand => ExecutedCommandExec(r) :: Nil

      case MemoryPlan(sink, output) =>
        val encoder = ExpressionEncoder(DataTypeUtils.fromAttributes(output))
        val toRow = encoder.createSerializer()
        LocalTableScanExec(output, sink.allData.map(r => toRow(r).copy()), None) :: Nil

      case logical.Distinct(child) =>
        throw SparkException.internalError(
          "logical distinct operator should have been replaced by aggregate in the optimizer")
      case logical.Intersect(left, right, false) =>
        throw SparkException.internalError(
          "logical intersect  operator should have been replaced by semi-join in the optimizer")
      case logical.Intersect(left, right, true) =>
        throw SparkException.internalError(
          "logical intersect operator should have been replaced by union, aggregate" +
            " and generate operators in the optimizer")
      case logical.Except(left, right, false) =>
        throw SparkException.internalError(
          "logical except operator should have been replaced by anti-join in the optimizer")
      case logical.Except(left, right, true) =>
        throw SparkException.internalError(
          "logical except (all) operator should have been replaced by union, aggregate" +
            " and generate operators in the optimizer")
      case logical.ResolvedHint(child, hints) =>
        throw SparkException.internalError(
          "ResolvedHint operator should have been replaced by join hint in the optimizer")
      case Deduplicate(_, child) if !child.isStreaming =>
        throw SparkException.internalError(
          "Deduplicate operator for non streaming data source should have been replaced " +
            "by aggregate in the optimizer")

      case logical.DeserializeToObject(deserializer, objAttr, child) =>
        execution.DeserializeToObjectExec(deserializer, objAttr, planLater(child)) :: Nil
      case logical.SerializeFromObject(serializer, child) =>
        execution.SerializeFromObjectExec(serializer, planLater(child)) :: Nil
      case logical.MapPartitions(f, objAttr, child) =>
        execution.MapPartitionsExec(f, objAttr, planLater(child)) :: Nil
      case logical.MapPartitionsInR(f, p, b, is, os, objAttr, child) =>
        execution.MapPartitionsExec(
          execution.r.MapPartitionsRWrapper(f, p, b, is, os), objAttr, planLater(child)) :: Nil
      case logical.FlatMapGroupsInR(f, p, b, is, os, key, value, grouping, data, objAttr, child) =>
        execution.FlatMapGroupsInRExec(f, p, b, is, os, key, value, grouping,
          data, objAttr, planLater(child)) :: Nil
      case logical.FlatMapGroupsInRWithArrow(f, p, b, is, ot, key, grouping, child) =>
        execution.FlatMapGroupsInRWithArrowExec(
          f, p, b, is, ot, key, grouping, planLater(child)) :: Nil
      case logical.MapPartitionsInRWithArrow(f, p, b, is, ot, child) =>
        execution.MapPartitionsInRWithArrowExec(
          f, p, b, is, ot, planLater(child)) :: Nil
      case logical.FlatMapGroupsInPandas(grouping, func, output, child) =>
        execution.python.FlatMapGroupsInPandasExec(grouping, func, output, planLater(child)) :: Nil
      case logical.FlatMapGroupsInArrow(grouping, func, output, child) =>
        execution.python.FlatMapGroupsInArrowExec(grouping, func, output, planLater(child)) :: Nil
      case f @ logical.FlatMapCoGroupsInPandas(_, _, func, output, left, right) =>
        execution.python.FlatMapCoGroupsInPandasExec(
          f.leftAttributes, f.rightAttributes,
          func, output, planLater(left), planLater(right)) :: Nil
      case f @ logical.FlatMapCoGroupsInArrow(_, _, func, output, left, right) =>
        execution.python.FlatMapCoGroupsInArrowExec(
          f.leftAttributes, f.rightAttributes,
          func, output, planLater(left), planLater(right)) :: Nil
      case logical.MapInPandas(func, output, child, isBarrier, profile) =>
        execution.python.MapInPandasExec(func, output, planLater(child), isBarrier, profile) :: Nil
      case logical.MapInArrow(func, output, child, isBarrier, profile) =>
        execution.python.MapInArrowExec(func, output, planLater(child), isBarrier, profile) :: Nil
      case logical.AttachDistributedSequence(attr, child) =>
        execution.python.AttachDistributedSequenceExec(attr, planLater(child)) :: Nil
      case logical.MapElements(f, _, _, objAttr, child) =>
        execution.MapElementsExec(f, objAttr, planLater(child)) :: Nil
      case logical.AppendColumns(f, _, _, in, out, child) =>
        execution.AppendColumnsExec(f, in, out, planLater(child)) :: Nil
      case logical.AppendColumnsWithObject(f, childSer, newSer, child) =>
        execution.AppendColumnsWithObjectExec(f, childSer, newSer, planLater(child)) :: Nil
      case logical.MapGroups(f, key, value, grouping, data, order, objAttr, child) =>
        execution.MapGroupsExec(
          f, key, value, grouping, data, order, objAttr, planLater(child)
        ) :: Nil
      case logical.FlatMapGroupsWithState(
          f, keyDeserializer, valueDeserializer, grouping, data, output, stateEncoder, outputMode,
          isFlatMapGroupsWithState, timeout, hasInitialState, initialStateGroupAttrs,
          initialStateDataAttrs, initialStateDeserializer, initialState, child) =>
        val skipEmittingInitialStateKeys =
          conf.getConf(SQLConf.FLATMAPGROUPSWITHSTATE_SKIP_EMITTING_INITIAL_STATE_KEYS)
        FlatMapGroupsWithStateExec.generateSparkPlanForBatchQueries(
          f, keyDeserializer, valueDeserializer, initialStateDeserializer, grouping,
          initialStateGroupAttrs, data, initialStateDataAttrs, output, timeout,
          hasInitialState, skipEmittingInitialStateKeys, planLater(initialState), planLater(child)
        ) :: Nil
      case logical.TransformWithState(keyDeserializer, valueDeserializer, groupingAttributes,
          dataAttributes, statefulProcessor, timeMode, outputMode, keyEncoder,
          outputObjAttr, child, hasInitialState,
          initialStateGroupingAttrs, initialStateDataAttrs,
          initialStateDeserializer, initialState) =>
        TransformWithStateExec.generateSparkPlanForBatchQueries(keyDeserializer, valueDeserializer,
          groupingAttributes, dataAttributes, statefulProcessor, timeMode, outputMode,
          keyEncoder, outputObjAttr, planLater(child), hasInitialState,
          initialStateGroupingAttrs, initialStateDataAttrs,
          initialStateDeserializer, planLater(initialState)) :: Nil
      case t @ TransformWithStateInPySpark(
        func, _, outputAttrs, outputMode, timeMode, userFacingDataType, child,
        hasInitialState, initialState, _, initialStateSchema) =>
        TransformWithStateInPySparkExec.generateSparkPlanForBatchQueries(func,
          t.leftAttributes, outputAttrs, outputMode, timeMode, userFacingDataType,
          planLater(child), hasInitialState, planLater(initialState), t.rightAttributes,
          initialStateSchema) :: Nil

      case _: FlatMapGroupsInPandasWithState =>
        // TODO(SPARK-40443): support applyInPandasWithState in batch query
        throw new SparkUnsupportedOperationException("_LEGACY_ERROR_TEMP_3176")
      case logical.CoGroup(
          f, key, lObj, rObj, lGroup, rGroup, lAttr, rAttr, lOrder, rOrder, oAttr, left, right) =>
        execution.CoGroupExec(
          f, key, lObj, rObj, lGroup, rGroup, lAttr, rAttr, lOrder, rOrder, oAttr,
          planLater(left), planLater(right)) :: Nil

      case r @ logical.Repartition(numPartitions, shuffle, child) =>
        if (shuffle) {
          ShuffleExchangeExec(r.partitioning, planLater(child), REPARTITION_BY_NUM) :: Nil
        } else {
          execution.CoalesceExec(numPartitions, planLater(child)) :: Nil
        }
      case logical.Sort(sortExprs, global, child, _) =>
        execution.SortExec(sortExprs, global, planLater(child)) :: Nil
      case logical.Project(projectList, child) =>
        execution.ProjectExec(projectList, planLater(child)) :: Nil
      case logical.Filter(condition, child) =>
        execution.FilterExec(condition, planLater(child)) :: Nil
      case f: logical.TypedFilter =>
        execution.FilterExec(f.typedCondition(f.deserializer), planLater(f.child)) :: Nil
      case e @ logical.Expand(_, _, child) =>
        execution.ExpandExec(e.projections, e.output, planLater(child)) :: Nil
      case logical.Sample(lb, ub, withReplacement, seed, child) =>
        execution.SampleExec(lb, ub, withReplacement, seed, planLater(child)) :: Nil
      case logical.LocalRelation(output, data, _, stream) =>
        LocalTableScanExec(output, data, stream) :: Nil
      case logical.EmptyRelation(l) => EmptyRelationExec(l) :: Nil
      case CommandResult(output, _, plan, data) => CommandResultExec(output, plan, data) :: Nil
      // We should match the combination of limit and offset first, to get the optimal physical
      // plan, instead of planning limit and offset separately.
      case LimitAndOffset(limit, offset, child) =>
        GlobalLimitExec(limit,
          LocalLimitExec(limit, planLater(child)), offset) :: Nil
      case OffsetAndLimit(offset, limit, child) =>
        // 'Offset a' then 'Limit b' is the same as 'Limit a + b' then 'Offset a'.
        GlobalLimitExec(offset + limit,
          LocalLimitExec(offset + limit, planLater(child)), offset) :: Nil
      case logical.LocalLimit(IntegerLiteral(limit), child) =>
        execution.LocalLimitExec(limit, planLater(child)) :: Nil
      case logical.GlobalLimit(IntegerLiteral(limit), child) =>
        execution.GlobalLimitExec(limit, planLater(child)) :: Nil
      case logical.Offset(IntegerLiteral(offset), child) =>
        GlobalLimitExec(child = planLater(child), offset = offset) :: Nil
      case union: logical.Union =>
        execution.UnionExec(union.children.map(planLater)) :: Nil
      case u @ logical.UnionLoop(id, anchor, recursion, limit) =>
        execution.UnionLoopExec(id, anchor, recursion, u.output, limit) :: Nil
      case g @ logical.Generate(generator, _, outer, _, _, child) =>
        execution.GenerateExec(
          generator, g.requiredChildOutput, outer,
          g.qualifiedGeneratorOutput, planLater(child)) :: Nil
      case _: logical.OneRowRelation =>
        execution.RDDScanExec(Nil, singleRowRdd, "OneRowRelation") :: Nil
      case r: logical.Range =>
        execution.RangeExec(r) :: Nil
      case r: logical.RepartitionByExpression =>
        val shuffleOrigin = if (r.partitionExpressions.isEmpty && r.optNumPartitions.isEmpty) {
          REBALANCE_PARTITIONS_BY_NONE
        } else if (r.optNumPartitions.isEmpty) {
          REPARTITION_BY_COL
        } else {
          REPARTITION_BY_NUM
        }
        exchange.ShuffleExchangeExec(
          r.partitioning, planLater(r.child),
          shuffleOrigin, r.optAdvisoryPartitionSize) :: Nil
      case r: logical.RebalancePartitions =>
        val shuffleOrigin = if (r.partitionExpressions.isEmpty) {
          REBALANCE_PARTITIONS_BY_NONE
        } else {
          REBALANCE_PARTITIONS_BY_COL
        }
        exchange.ShuffleExchangeExec(
          r.partitioning, planLater(r.child),
          shuffleOrigin, r.optAdvisoryPartitionSize) :: Nil
      case ExternalRDD(outputObjAttr, rdd) => ExternalRDDScanExec(outputObjAttr, rdd) :: Nil
      case r: LogicalRDD =>
        RDDScanExec(r.output, r.rdd, "ExistingRDD", r.outputPartitioning, r.outputOrdering,
          r.stream) :: Nil
      case _: UpdateTable =>
        throw QueryExecutionErrors.ddlUnsupportedTemporarilyError("UPDATE TABLE")
      case _: MergeIntoTable =>
        throw QueryExecutionErrors.ddlUnsupportedTemporarilyError("MERGE INTO TABLE")
      case logical.CollectMetrics(name, metrics, child, _) =>
        execution.CollectMetricsExec(name, metrics, planLater(child)) :: Nil
      case WriteFiles(child, fileFormat, partitionColumns, bucket, options, staticPartitions) =>
        WriteFilesExec(planLater(child), fileFormat, partitionColumns, bucket, options,
          staticPartitions) :: Nil
      case MultiResult(children) =>
        MultiResultExec(children.map(planLater)) :: Nil
      case _ => Nil
    }