python/datafusion/plan.py (50 lines of code) (raw):

# Licensed to the Apache Software Foundation (ASF) under one # or more contributor license agreements. See the NOTICE file # distributed with this work for additional information # regarding copyright ownership. The ASF licenses this file # to you under the Apache License, Version 2.0 (the # "License"); you may not use this file except in compliance # with the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, # software distributed under the License is distributed on an # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. """This module supports physical and logical plans in DataFusion.""" from __future__ import annotations from typing import TYPE_CHECKING, Any import datafusion._internal as df_internal if TYPE_CHECKING: from datafusion.context import SessionContext __all__ = [ "ExecutionPlan", "LogicalPlan", ] class LogicalPlan: """Logical Plan. A `LogicalPlan` is a node in a tree of relational operators (such as Projection or Filter). Represents transforming an input relation (table) to an output relation (table) with a potentially different schema. Plans form a dataflow tree where data flows from leaves up to the root to produce the query result. A `LogicalPlan` can be created by the SQL query planner, the DataFrame API, or programmatically (for example custom query languages). """ def __init__(self, plan: df_internal.LogicalPlan) -> None: """This constructor should not be called by the end user.""" self._raw_plan = plan def to_variant(self) -> Any: """Convert the logical plan into its specific variant.""" return self._raw_plan.to_variant() def inputs(self) -> list[LogicalPlan]: """Returns the list of inputs to the logical plan.""" return [LogicalPlan(p) for p in self._raw_plan.inputs()] def __repr__(self) -> str: """Generate a printable representation of the plan.""" return self._raw_plan.__repr__() def display(self) -> str: """Print the logical plan.""" return self._raw_plan.display() def display_indent(self) -> str: """Print an indented form of the logical plan.""" return self._raw_plan.display_indent() def display_indent_schema(self) -> str: """Print an indented form of the schema for the logical plan.""" return self._raw_plan.display_indent_schema() def display_graphviz(self) -> str: """Print the graph visualization of the logical plan. Returns a `format`able structure that produces lines meant for graphical display using the `DOT` language. This format can be visualized using software from [`graphviz`](https://graphviz.org/) """ return self._raw_plan.display_graphviz() @staticmethod def from_proto(ctx: SessionContext, data: bytes) -> LogicalPlan: """Create a LogicalPlan from protobuf bytes. Tables created in memory from record batches are currently not supported. """ return LogicalPlan(df_internal.LogicalPlan.from_proto(ctx.ctx, data)) def to_proto(self) -> bytes: """Convert a LogicalPlan to protobuf bytes. Tables created in memory from record batches are currently not supported. """ return self._raw_plan.to_proto() class ExecutionPlan: """Represent nodes in the DataFusion Physical Plan.""" def __init__(self, plan: df_internal.ExecutionPlan) -> None: """This constructor should not be called by the end user.""" self._raw_plan = plan def children(self) -> list[ExecutionPlan]: """Get a list of children `ExecutionPlan` that act as inputs to this plan. The returned list will be empty for leaf nodes such as scans, will contain a single value for unary nodes, or two values for binary nodes (such as joins). """ return [ExecutionPlan(e) for e in self._raw_plan.children()] def display(self) -> str: """Print the physical plan.""" return self._raw_plan.display() def display_indent(self) -> str: """Print an indented form of the physical plan.""" return self._raw_plan.display_indent() def __repr__(self) -> str: """Print a string representation of the physical plan.""" return self._raw_plan.__repr__() @property def partition_count(self) -> int: """Returns the number of partitions in the physical plan.""" return self._raw_plan.partition_count @staticmethod def from_proto(ctx: SessionContext, data: bytes) -> ExecutionPlan: """Create an ExecutionPlan from protobuf bytes. Tables created in memory from record batches are currently not supported. """ return ExecutionPlan(df_internal.ExecutionPlan.from_proto(ctx.ctx, data)) def to_proto(self) -> bytes: """Convert an ExecutionPlan into protobuf bytes. Tables created in memory from record batches are currently not supported. """ return self._raw_plan.to_proto()