in sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala [879:1082]
def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
case d: DataWritingCommand => DataWritingCommandExec(d, planLater(d.query)) :: Nil
case r: RunnableCommand => ExecutedCommandExec(r) :: Nil
case MemoryPlan(sink, output) =>
val encoder = ExpressionEncoder(DataTypeUtils.fromAttributes(output))
val toRow = encoder.createSerializer()
LocalTableScanExec(output, sink.allData.map(r => toRow(r).copy()), None) :: Nil
case logical.Distinct(child) =>
throw SparkException.internalError(
"logical distinct operator should have been replaced by aggregate in the optimizer")
case logical.Intersect(left, right, false) =>
throw SparkException.internalError(
"logical intersect operator should have been replaced by semi-join in the optimizer")
case logical.Intersect(left, right, true) =>
throw SparkException.internalError(
"logical intersect operator should have been replaced by union, aggregate" +
" and generate operators in the optimizer")
case logical.Except(left, right, false) =>
throw SparkException.internalError(
"logical except operator should have been replaced by anti-join in the optimizer")
case logical.Except(left, right, true) =>
throw SparkException.internalError(
"logical except (all) operator should have been replaced by union, aggregate" +
" and generate operators in the optimizer")
case logical.ResolvedHint(child, hints) =>
throw SparkException.internalError(
"ResolvedHint operator should have been replaced by join hint in the optimizer")
case Deduplicate(_, child) if !child.isStreaming =>
throw SparkException.internalError(
"Deduplicate operator for non streaming data source should have been replaced " +
"by aggregate in the optimizer")
case logical.DeserializeToObject(deserializer, objAttr, child) =>
execution.DeserializeToObjectExec(deserializer, objAttr, planLater(child)) :: Nil
case logical.SerializeFromObject(serializer, child) =>
execution.SerializeFromObjectExec(serializer, planLater(child)) :: Nil
case logical.MapPartitions(f, objAttr, child) =>
execution.MapPartitionsExec(f, objAttr, planLater(child)) :: Nil
case logical.MapPartitionsInR(f, p, b, is, os, objAttr, child) =>
execution.MapPartitionsExec(
execution.r.MapPartitionsRWrapper(f, p, b, is, os), objAttr, planLater(child)) :: Nil
case logical.FlatMapGroupsInR(f, p, b, is, os, key, value, grouping, data, objAttr, child) =>
execution.FlatMapGroupsInRExec(f, p, b, is, os, key, value, grouping,
data, objAttr, planLater(child)) :: Nil
case logical.FlatMapGroupsInRWithArrow(f, p, b, is, ot, key, grouping, child) =>
execution.FlatMapGroupsInRWithArrowExec(
f, p, b, is, ot, key, grouping, planLater(child)) :: Nil
case logical.MapPartitionsInRWithArrow(f, p, b, is, ot, child) =>
execution.MapPartitionsInRWithArrowExec(
f, p, b, is, ot, planLater(child)) :: Nil
case logical.FlatMapGroupsInPandas(grouping, func, output, child) =>
execution.python.FlatMapGroupsInPandasExec(grouping, func, output, planLater(child)) :: Nil
case logical.FlatMapGroupsInArrow(grouping, func, output, child) =>
execution.python.FlatMapGroupsInArrowExec(grouping, func, output, planLater(child)) :: Nil
case f @ logical.FlatMapCoGroupsInPandas(_, _, func, output, left, right) =>
execution.python.FlatMapCoGroupsInPandasExec(
f.leftAttributes, f.rightAttributes,
func, output, planLater(left), planLater(right)) :: Nil
case f @ logical.FlatMapCoGroupsInArrow(_, _, func, output, left, right) =>
execution.python.FlatMapCoGroupsInArrowExec(
f.leftAttributes, f.rightAttributes,
func, output, planLater(left), planLater(right)) :: Nil
case logical.MapInPandas(func, output, child, isBarrier, profile) =>
execution.python.MapInPandasExec(func, output, planLater(child), isBarrier, profile) :: Nil
case logical.MapInArrow(func, output, child, isBarrier, profile) =>
execution.python.MapInArrowExec(func, output, planLater(child), isBarrier, profile) :: Nil
case logical.AttachDistributedSequence(attr, child) =>
execution.python.AttachDistributedSequenceExec(attr, planLater(child)) :: Nil
case logical.MapElements(f, _, _, objAttr, child) =>
execution.MapElementsExec(f, objAttr, planLater(child)) :: Nil
case logical.AppendColumns(f, _, _, in, out, child) =>
execution.AppendColumnsExec(f, in, out, planLater(child)) :: Nil
case logical.AppendColumnsWithObject(f, childSer, newSer, child) =>
execution.AppendColumnsWithObjectExec(f, childSer, newSer, planLater(child)) :: Nil
case logical.MapGroups(f, key, value, grouping, data, order, objAttr, child) =>
execution.MapGroupsExec(
f, key, value, grouping, data, order, objAttr, planLater(child)
) :: Nil
case logical.FlatMapGroupsWithState(
f, keyDeserializer, valueDeserializer, grouping, data, output, stateEncoder, outputMode,
isFlatMapGroupsWithState, timeout, hasInitialState, initialStateGroupAttrs,
initialStateDataAttrs, initialStateDeserializer, initialState, child) =>
val skipEmittingInitialStateKeys =
conf.getConf(SQLConf.FLATMAPGROUPSWITHSTATE_SKIP_EMITTING_INITIAL_STATE_KEYS)
FlatMapGroupsWithStateExec.generateSparkPlanForBatchQueries(
f, keyDeserializer, valueDeserializer, initialStateDeserializer, grouping,
initialStateGroupAttrs, data, initialStateDataAttrs, output, timeout,
hasInitialState, skipEmittingInitialStateKeys, planLater(initialState), planLater(child)
) :: Nil
case logical.TransformWithState(keyDeserializer, valueDeserializer, groupingAttributes,
dataAttributes, statefulProcessor, timeMode, outputMode, keyEncoder,
outputObjAttr, child, hasInitialState,
initialStateGroupingAttrs, initialStateDataAttrs,
initialStateDeserializer, initialState) =>
TransformWithStateExec.generateSparkPlanForBatchQueries(keyDeserializer, valueDeserializer,
groupingAttributes, dataAttributes, statefulProcessor, timeMode, outputMode,
keyEncoder, outputObjAttr, planLater(child), hasInitialState,
initialStateGroupingAttrs, initialStateDataAttrs,
initialStateDeserializer, planLater(initialState)) :: Nil
case t @ TransformWithStateInPySpark(
func, _, outputAttrs, outputMode, timeMode, userFacingDataType, child,
hasInitialState, initialState, _, initialStateSchema) =>
TransformWithStateInPySparkExec.generateSparkPlanForBatchQueries(func,
t.leftAttributes, outputAttrs, outputMode, timeMode, userFacingDataType,
planLater(child), hasInitialState, planLater(initialState), t.rightAttributes,
initialStateSchema) :: Nil
case _: FlatMapGroupsInPandasWithState =>
// TODO(SPARK-40443): support applyInPandasWithState in batch query
throw new SparkUnsupportedOperationException("_LEGACY_ERROR_TEMP_3176")
case logical.CoGroup(
f, key, lObj, rObj, lGroup, rGroup, lAttr, rAttr, lOrder, rOrder, oAttr, left, right) =>
execution.CoGroupExec(
f, key, lObj, rObj, lGroup, rGroup, lAttr, rAttr, lOrder, rOrder, oAttr,
planLater(left), planLater(right)) :: Nil
case r @ logical.Repartition(numPartitions, shuffle, child) =>
if (shuffle) {
ShuffleExchangeExec(r.partitioning, planLater(child), REPARTITION_BY_NUM) :: Nil
} else {
execution.CoalesceExec(numPartitions, planLater(child)) :: Nil
}
case logical.Sort(sortExprs, global, child, _) =>
execution.SortExec(sortExprs, global, planLater(child)) :: Nil
case logical.Project(projectList, child) =>
execution.ProjectExec(projectList, planLater(child)) :: Nil
case logical.Filter(condition, child) =>
execution.FilterExec(condition, planLater(child)) :: Nil
case f: logical.TypedFilter =>
execution.FilterExec(f.typedCondition(f.deserializer), planLater(f.child)) :: Nil
case e @ logical.Expand(_, _, child) =>
execution.ExpandExec(e.projections, e.output, planLater(child)) :: Nil
case logical.Sample(lb, ub, withReplacement, seed, child) =>
execution.SampleExec(lb, ub, withReplacement, seed, planLater(child)) :: Nil
case logical.LocalRelation(output, data, _, stream) =>
LocalTableScanExec(output, data, stream) :: Nil
case logical.EmptyRelation(l) => EmptyRelationExec(l) :: Nil
case CommandResult(output, _, plan, data) => CommandResultExec(output, plan, data) :: Nil
// We should match the combination of limit and offset first, to get the optimal physical
// plan, instead of planning limit and offset separately.
case LimitAndOffset(limit, offset, child) =>
GlobalLimitExec(limit,
LocalLimitExec(limit, planLater(child)), offset) :: Nil
case OffsetAndLimit(offset, limit, child) =>
// 'Offset a' then 'Limit b' is the same as 'Limit a + b' then 'Offset a'.
GlobalLimitExec(offset + limit,
LocalLimitExec(offset + limit, planLater(child)), offset) :: Nil
case logical.LocalLimit(IntegerLiteral(limit), child) =>
execution.LocalLimitExec(limit, planLater(child)) :: Nil
case logical.GlobalLimit(IntegerLiteral(limit), child) =>
execution.GlobalLimitExec(limit, planLater(child)) :: Nil
case logical.Offset(IntegerLiteral(offset), child) =>
GlobalLimitExec(child = planLater(child), offset = offset) :: Nil
case union: logical.Union =>
execution.UnionExec(union.children.map(planLater)) :: Nil
case u @ logical.UnionLoop(id, anchor, recursion, limit) =>
execution.UnionLoopExec(id, anchor, recursion, u.output, limit) :: Nil
case g @ logical.Generate(generator, _, outer, _, _, child) =>
execution.GenerateExec(
generator, g.requiredChildOutput, outer,
g.qualifiedGeneratorOutput, planLater(child)) :: Nil
case _: logical.OneRowRelation =>
execution.RDDScanExec(Nil, singleRowRdd, "OneRowRelation") :: Nil
case r: logical.Range =>
execution.RangeExec(r) :: Nil
case r: logical.RepartitionByExpression =>
val shuffleOrigin = if (r.partitionExpressions.isEmpty && r.optNumPartitions.isEmpty) {
REBALANCE_PARTITIONS_BY_NONE
} else if (r.optNumPartitions.isEmpty) {
REPARTITION_BY_COL
} else {
REPARTITION_BY_NUM
}
exchange.ShuffleExchangeExec(
r.partitioning, planLater(r.child),
shuffleOrigin, r.optAdvisoryPartitionSize) :: Nil
case r: logical.RebalancePartitions =>
val shuffleOrigin = if (r.partitionExpressions.isEmpty) {
REBALANCE_PARTITIONS_BY_NONE
} else {
REBALANCE_PARTITIONS_BY_COL
}
exchange.ShuffleExchangeExec(
r.partitioning, planLater(r.child),
shuffleOrigin, r.optAdvisoryPartitionSize) :: Nil
case ExternalRDD(outputObjAttr, rdd) => ExternalRDDScanExec(outputObjAttr, rdd) :: Nil
case r: LogicalRDD =>
RDDScanExec(r.output, r.rdd, "ExistingRDD", r.outputPartitioning, r.outputOrdering,
r.stream) :: Nil
case _: UpdateTable =>
throw QueryExecutionErrors.ddlUnsupportedTemporarilyError("UPDATE TABLE")
case _: MergeIntoTable =>
throw QueryExecutionErrors.ddlUnsupportedTemporarilyError("MERGE INTO TABLE")
case logical.CollectMetrics(name, metrics, child, _) =>
execution.CollectMetricsExec(name, metrics, planLater(child)) :: Nil
case WriteFiles(child, fileFormat, partitionColumns, bucket, options, staticPartitions) =>
WriteFilesExec(planLater(child), fileFormat, partitionColumns, bucket, options,
staticPartitions) :: Nil
case MultiResult(children) =>
MultiResultExec(children.map(planLater)) :: Nil
case _ => Nil
}