smallpond/logical/node.py (1,198 lines of code) (raw):

import copy import math import os import os.path import re import traceback import warnings from collections import defaultdict from typing import ( Callable, Dict, Generic, Iterable, List, Literal, Optional, Tuple, TypeVar, Union, ) import numpy as np import pandas as pd import pyarrow as arrow from graphviz import Digraph from smallpond.common import ( DATA_PARTITION_COLUMN_NAME, DEFAULT_BATCH_SIZE, DEFAULT_ROW_GROUP_SIZE, GB, GENERATED_COLUMNS, ) from smallpond.execution.task import ( ArrowBatchTask, ArrowComputeTask, ArrowStreamTask, DataSinkTask, DataSourceTask, EvenlyDistributedPartitionProducerTask, HashPartitionTask, LoadPartitionedDataSetProducerTask, MergeDataSetsTask, PandasBatchTask, PandasComputeTask, PartitionConsumerTask, PartitionInfo, PartitionProducerTask, PerfStats, ProjectionTask, PythonScriptTask, RepeatPartitionProducerTask, RuntimeContext, SplitDataSetTask, SqlEngineTask, Task, UserDefinedPartitionProducerTask, ) from smallpond.logical.dataset import DataSet, ParquetDataSet from smallpond.logical.udf import ( DuckDbExtensionContext, ExternalModuleContext, PythonUDFContext, UDFContext, UDFType, UserDefinedFunction, ) class NodeId(int): """ A unique identifier for each node. """ def __str__(self) -> str: return f"{self:06d}" class Context(object): """ Global context for each logical plan. Right now it's mainly used to keep a list of Python UDFs. """ def __init__(self) -> None: self.next_node_id = 0 self.udfs: Dict[str, UDFContext] = {} def _new_node_id(self) -> NodeId: """ Generate a new node id. """ self.next_node_id += 1 return NodeId(self.next_node_id) def create_function( self, name: str, func: Callable, params: Optional[List[UDFType]], return_type: Optional[UDFType], use_arrow_type=False, ) -> str: """ Define a Python UDF to be referenced in the logical plan. Currently only scalar functions (return one element per row) are supported. See https://duckdb.org/docs/archive/0.9.2/api/python/function. Parameters ---------- name A unique function name, which can be referenced in SQL query. func The Python function you wish to register as a UDF. params A list of column types for function parameters, including basic types: `UDFType.INTEGER`, `UDFType.FLOAT`, `UDFType.VARCHAR`, `UDFType.BLOB` etc, and container types: ``` UDFListType(UDFType.INTEGER), UDFMapType(UDFType.VARCHAR, UDFType.INTEGER), UDFListType(UDFStructType({'int': 'INTEGER', 'str': 'VARCHAR'})). ``` These types are simple wrappers of duckdb types defined in https://duckdb.org/docs/api/python/types.html. Set params to `UDFAnyParameters()` allows the udf to accept parameters of any type. use_arrow_type, optional Specify true to use PyArrow Tables, by default use built-in Python types. return_type The return type of the function, see the above note for `params`. Returns ------- The unique function name. """ self.udfs[name] = PythonUDFContext(name, func, params, return_type, use_arrow_type) return name def create_external_module(self, module_path: str, name: str = None) -> str: """ Load an external DuckDB module. """ name = name or os.path.basename(module_path) self.udfs[name] = ExternalModuleContext(name, module_path) return name def create_duckdb_extension(self, extension_path: str, name: str = None) -> str: """ Load a DuckDB extension. """ name = name or os.path.basename(extension_path) self.udfs[name] = DuckDbExtensionContext(name, extension_path) return name class Node(object): """ The base class for all nodes. """ enable_resource_boost = False def __init__( self, ctx: Context, input_deps: "Tuple[Node, ...]", output_name: Optional[str] = None, output_path: Optional[str] = None, cpu_limit: int = 1, gpu_limit: float = 0, memory_limit: Optional[int] = None, ) -> None: """ The base class for all nodes in logical plan. Parameters ---------- ctx The context of logical plan. input_deps Define the inputs of this node. output_name, optional The prefix of output directories and filenames for tasks generated from this node. The default `output_name` is the class name of the task created for this node, e.g. `HashPartitionTask, SqlEngineTask, PythonScriptTask`, etc. The `output_name` string should only include alphanumeric characters and underscore. In other words, it matches regular expression `[a-zA-Z0-9_]+`. If `output_name` is set and `output_path` is None, the path format of output files is: `{job_root_path}/output/{output_name}/{task_runtime_id}/{output_name}-{task_runtime_id}-{seqnum}.parquet` where `{task_runtime_id}` is defined as `{job_id}.{task_id}.{sched_epoch}.{task_retry_count}`. output_path, optional The absolute path of a customized output folder for tasks generated from this node. Any shared folder that can be accessed by executor and scheduler is allowed although IO performance varies across filesystems. If both `output_name` and `output_path` are specified, the path format of output files is: `{output_path}/{output_name}/{task_runtime_id}/{output_name}-{task_runtime_id}-{seqnum}.parquet` where `{task_runtime_id}` is defined as `{job_id}.{task_id}.{sched_epoch}.{task_retry_count}`. cpu_limit, optional The max number of CPUs would be used by tasks generated from this node. This is a resource requirement specified by the user and used to guide task scheduling. smallpond does NOT enforce this limit. gpu_limit, optional The max number of GPUs would be used by tasks generated from this node. This is a resource requirement specified by the user and used to guide task scheduling. smallpond does NOT enforce this limit. memory_limit, optional The max memory would be used by tasks generated from this node. The memory limit is automatically calculated based memory-to-cpu ratio of executor machine if not specified. This is a resource requirement specified by the user and used to guide task scheduling. smallpond does NOT enforce this limit. """ assert isinstance(input_deps, Iterable), f"input_deps is not iterable: {input_deps}" assert all(isinstance(node, Node) for node in input_deps), f"some of input_deps are not instances of Node: {input_deps}" assert output_name is None or re.match("[a-zA-Z0-9_]+", output_name), f"output_name has invalid format: {output_name}" assert output_path is None or os.path.isabs(output_path), f"output_path is not an absolute path: {output_path}" self.ctx = ctx self.id = self.ctx._new_node_id() self.input_deps = input_deps self.output_name = output_name self.output_path = output_path self.cpu_limit = max(cpu_limit, gpu_limit * 8) self.gpu_limit = gpu_limit self.memory_limit = memory_limit self.generated_tasks: List[str] = [] self.perf_stats: Dict[str, PerfStats] = {} self.perf_metrics: Dict[str, List[float]] = defaultdict(list) # record the location where the node is constructed in user code frame = next( frame for frame in reversed(traceback.extract_stack()) if frame.filename != __file__ and not frame.filename.endswith("/dataframe.py") ) self.location = f"{frame.filename}:{frame.lineno}" def __repr__(self) -> str: return f"{self.__class__.__name__}-{self.id}" def __str__(self) -> str: return ( f"{repr(self)}: input_deps={self.input_deps}, output_name={self.output_name}, " f"tasks[{len(self.generated_tasks)}]={self.generated_tasks[:1]}...{self.generated_tasks[-1:]}, " f"resource_limit={self.cpu_limit}CPUs/{self.gpu_limit}GPUs/{(self.memory_limit or 0)//GB}GB" ) @staticmethod def task_factory(task_builder): def wrapper(self: Node, *args, **kwargs): task: Task = task_builder(self, *args, **kwargs) task.node_id = self.id task.location = self.location self.generated_tasks.append(task.key) return task return wrapper def slim_copy(self): node = copy.copy(self) del node.input_deps, node.generated_tasks return node def create_task( self, runtime_ctx: RuntimeContext, input_deps: List[Task], partition_infos: List[PartitionInfo], ) -> Task: raise NotImplementedError def add_perf_metrics(self, name, value: Union[List[float], float]): self.perf_metrics[name].append(value) self.perf_stats.pop(name, None) def get_perf_stats(self, name): if name not in self.perf_stats: if name not in self.perf_metrics: return None values = self.perf_metrics[name] min, max, avg = np.min(values), np.max(values), np.average(values) p50, p75, p95, p99 = np.percentile(values, (50, 75, 95, 99)) self.perf_stats[name] = PerfStats(len(values), sum(values), min, max, avg, p50, p75, p95, p99) return self.perf_stats[name] @property def num_partitions(self) -> int: raise NotImplementedError("num_partitions") class DataSourceNode(Node): """ All inputs of a logical plan are represented as `DataSourceNode`. It does not depend on any other node. """ def __init__(self, ctx: Context, dataset: DataSet) -> None: """ Construct a DataSourceNode. See :func:`Node.__init__` to find comments on other parameters. Parameters ---------- dataset A DataSet instance serving as a input of the plan. Set to `None` to create a dummy data source. """ assert dataset is None or isinstance(dataset, DataSet) super().__init__(ctx, []) self.dataset = dataset if dataset is not None else ParquetDataSet([]) def __str__(self) -> str: return super().__str__() + f", dataset=<{self.dataset}>" @Node.task_factory def create_task( self, runtime_ctx: RuntimeContext, input_deps: List[Task], partition_infos: List[PartitionInfo], ) -> DataSourceTask: return DataSourceTask(runtime_ctx, self.dataset, partition_infos) @property def num_partitions(self) -> int: return 1 DataSinkType = Literal["link", "copy", "link_or_copy", "manifest"] class DataSinkNode(Node): """ Collect the output files of `input_deps` to `output_path`. Depending on the options, it may create hard links, symbolic links, manifest files, or copy files. """ def __init__( self, ctx: Context, input_deps: Tuple[Node, ...], output_path: str, type: DataSinkType = "link", manifest_only=False, is_final_node=False, ) -> None: """ Construct a DataSinkNode. See :func:`Node.__init__` to find comments on other parameters. Parameters ---------- output_path The absolute path of a customized output folder. If set to None, an output folder would be created under the default output root. Any shared folder that can be accessed by executor and scheduler is allowed although IO performance varies across filesystems. type, optional The operation type of the sink node. "link" (default): If an output file is under the same mount point as `output_path`, a hard link is created; otherwise a symlink. "copy": Copies files to the output path. "link_or_copy": If an output file is under the same mount point as `output_path`, creates a hard link; otherwise copies the file. "manifest": Creates a manifest file under `output_path`. Every line of the manifest file is a path string. manifest_only, optional, deprecated Set type to "manifest". """ assert type in ( "link", "copy", "link_or_copy", "manifest", ), f"invalid sink type: {type}" super().__init__(ctx, input_deps, None, output_path, cpu_limit=1, gpu_limit=0, memory_limit=0) self.type: DataSinkType = "manifest" if manifest_only else type self.is_final_node = is_final_node def __str__(self) -> str: return super().__str__() + f", output_path={self.output_path}, type={self.type}" @Node.task_factory def create_task( self, runtime_ctx: RuntimeContext, input_deps: List[Task], partition_infos: List[PartitionInfo], ) -> DataSinkTask: # design considerations: # 1. data copy should start as soon as possible. # 2. file names may conflict across partitions of different tasks. # we should rename files **if and only if** there are conflicts. # 3. resolving conflicts requires a single task. if self.type == "copy" or self.type == "link_or_copy": # so we create two phase tasks: # phase1: copy data to a temp directory, for each input partition in parallel input_deps = [self._create_phase1_task(runtime_ctx, task, [PartitionInfo(i, len(input_deps))]) for i, task in enumerate(input_deps)] # phase2: resolve file name conflicts, hard link files, create manifest file, and clean up temp directory return DataSinkTask( runtime_ctx, input_deps, [PartitionInfo()], self.output_path, type="link_manifest", is_final_node=self.is_final_node, ) elif self.type == "link": return DataSinkTask( runtime_ctx, input_deps, [PartitionInfo()], self.output_path, type="link_manifest", is_final_node=self.is_final_node, ) elif self.type == "manifest": return DataSinkTask( runtime_ctx, input_deps, [PartitionInfo()], self.output_path, type="manifest", is_final_node=self.is_final_node, ) else: raise ValueError(f"invalid sink type: {self.type}") @Node.task_factory def _create_phase1_task( self, runtime_ctx: RuntimeContext, input_dep: Task, partition_infos: List[PartitionInfo], ) -> DataSinkTask: return DataSinkTask(runtime_ctx, [input_dep], partition_infos, self.output_path, type=self.type) class PythonScriptNode(Node): """ Run Python code to process the input datasets with `PythonScriptNode.process(...)`. If the code needs to access attributes of runtime task, e.g. `local_rank`, `random_seed_long`, `numpy_random_gen`, - create a subclass of `PythonScriptTask`, which implements `PythonScriptTask.process(...)`, - override `PythonScriptNode.spawn(...)` and return an instance of the subclass. Or use `runtime_ctx.task` in `process(runtime_ctx: RuntimeContext, ...)` function. """ def __init__( self, ctx: Context, input_deps: Tuple[Node, ...], *, process_func: Optional[Callable[[RuntimeContext, List[DataSet], str], bool]] = None, output_name: Optional[str] = None, output_path: Optional[str] = None, cpu_limit: int = 1, gpu_limit: float = 0, memory_limit: Optional[int] = None, ): """ Construct a PythonScriptNode. See :func:`Node.__init__` to find comments on other parameters. Parameters ---------- process_func, optional User-defined process function, which should have the same signature as `self.process(...)`. If user-defined function has extra parameters, use `functools.partial(...)` to bind arguments. See `test_partial_process_func` in `test/test_execution.py` for examples of usage. """ super().__init__( ctx, input_deps, output_name, output_path, cpu_limit, gpu_limit, memory_limit, ) self.process_func = process_func def __str__(self) -> str: return super().__str__() + f", process_func={self.process_func}" @Node.task_factory def create_task( self, runtime_ctx: RuntimeContext, input_deps: List[Task], partition_infos: List[PartitionInfo], ) -> PythonScriptTask: return self.spawn( runtime_ctx, input_deps, partition_infos, self.process_func or self.slim_copy().process, # warn: do not call self.slim_copy() in __init__ as attributes may not be fully initialized self.output_name, self.output_path, self.cpu_limit, self.gpu_limit, self.memory_limit, ) def spawn(self, *args, **kwargs) -> PythonScriptTask: """ Return an instance of subclass of `PythonScriptTask`. The subclass should override `PythonScriptTask.process(...)`. Examples -------- ``` class OutputMsgPythonTask(PythonScriptTask): def __init__(self, msg: str, *args, **kwargs) -> None: super().__init__(*args, **kwargs) self.msg = msg def process(self, runtime_ctx: RuntimeContext, input_datasets: List[DataSet], output_path: str) -> bool: fout = (Path(output_path) / f"{self.output_filename}-{self.local_rank}.SUCCESS") fout.write_text(f"msg: {self.msg}, seed: {self.random_seed_uint32}, rank: {self.local_rank}") return True class OutputMsgPythonNode(PythonScriptNode): def spawn(self, *args, **kwargs) -> OutputMsgPythonTask: return OutputMsgPythonTask("python script", *args, **kwargs) ``` """ return PythonScriptTask(*args, **kwargs) def process( self, runtime_ctx: RuntimeContext, input_datasets: List[DataSet], output_path: str, ) -> bool: """ Put user-defined code here. Parameters ---------- runtime_ctx The runtime context, which defines a few global configuration info. input_datasets A list of input datasets. The number of datasets equal to the number of input_deps. output_path The absolute path of output directory created for each task generated from this node. The outputs generated by this node would be consumed by tasks of downstream nodes. Returns ------- Return true if success. Return false or throw an exception if there is any error. """ raise NotImplementedError class ArrowComputeNode(Node): """ Run Python code to process the input datasets, which have been loaded as Apache Arrow tables. See https://arrow.apache.org/docs/python/generated/pyarrow.Table.html. If the code needs to access attributes of runtime task, e.g. `local_rank`, `random_seed_long`, `numpy_random_gen`, - create a subclass of `ArrowComputeTask`, which implements `ArrowComputeTask.process(...)`, - override `ArrowComputeNode.spawn(...)` and return an instance of the subclass. Or use `runtime_ctx.task` in `process(runtime_ctx: RuntimeContext, ...)` function. """ default_row_group_size = DEFAULT_ROW_GROUP_SIZE def __init__( self, ctx: Context, input_deps: Tuple[Node, ...], *, process_func: Callable[[RuntimeContext, List[arrow.Table]], arrow.Table] = None, parquet_row_group_size: int = None, parquet_dictionary_encoding=False, parquet_compression="ZSTD", parquet_compression_level=3, use_duckdb_reader=False, output_name: str = None, output_path: str = None, cpu_limit: int = 1, gpu_limit: float = 0, memory_limit: Optional[int] = None, ) -> None: """ Construct a ArrowComputeNode. See :func:`Node.__init__` to find comments on other parameters. Parameters ---------- process_func, optional User-defined process function, which should have the same signature as `self.process(...)`. If user-defined function has extra parameters, use `functools.partial(...)` to bind arguments. See `test_partial_process_func` in `test/test_execution.py` for examples of usage. parquet_row_group_size, optional The number of rows stored in each row group of parquet file. Large row group size provides more opportunities to compress the data. Small row groups size could make filtering rows faster and achieve high concurrency. See https://duckdb.org/docs/data/parquet/tips.html#selecting-a-row_group_size. parquet_dictionary_encoding, optional Specify if we should use dictionary encoding in general or only for some columns. See `use_dictionary` in https://arrow.apache.org/docs/python/generated/pyarrow.parquet.ParquetWriter.html. use_duckdb_reader, optional Use duckdb (instead of pyarrow parquet module) to load parquet files as arrow table. cpu_limit, optional The max number of CPUs would be used by tasks generated from this node. This is a resource requirement specified by the user and used to guide task scheduling. smallpond does NOT enforce this limit. gpu_limit, optional The max number of GPUs would be used by tasks generated from this node. This is a resource requirement specified by the user and used to guide task scheduling. smallpond does NOT enforce this limit. memory_limit, optional The max memory would be used by tasks generated from this node. This is a resource requirement specified by the user and used to guide task scheduling. smallpond does NOT enforce this limit. """ super().__init__( ctx, input_deps, output_name, output_path, cpu_limit, gpu_limit, memory_limit, ) self.parquet_row_group_size = parquet_row_group_size or self.default_row_group_size self.parquet_dictionary_encoding = parquet_dictionary_encoding self.parquet_compression = parquet_compression self.parquet_compression_level = parquet_compression_level self.use_duckdb_reader = use_duckdb_reader self.process_func = process_func def __str__(self) -> str: return super().__str__() + f", process_func={self.process_func}" @Node.task_factory def create_task( self, runtime_ctx: RuntimeContext, input_deps: List[Task], partition_infos: List[PartitionInfo], ) -> ArrowComputeTask: return self.spawn( runtime_ctx, input_deps, partition_infos, self.process_func or self.slim_copy().process, # warn: do not call self.slim_copy() in __init__ as attributes may not be fully initialized self.parquet_row_group_size, self.parquet_dictionary_encoding, self.parquet_compression, self.parquet_compression_level, self.use_duckdb_reader, self.output_name, self.output_path, self.cpu_limit, self.gpu_limit, self.memory_limit, ) def spawn(self, *args, **kwargs) -> ArrowComputeTask: """ Return an instance of subclass of `ArrowComputeTask`. The subclass should override `ArrowComputeTask.process(...)`. Examples -------- ``` class CopyInputArrowTask(ArrowComputeTask): def __init__(self, msg: str, *args, **kwargs) -> None: super().__init__(*args, **kwargs) self.msg = msg def process(self, runtime_ctx: RuntimeContext, input_tables: List[arrow.Table]) -> arrow.Table: return input_tables[0] class CopyInputArrowNode(ArrowComputeNode): def spawn(self, *args, **kwargs) -> CopyInputArrowTask: return CopyInputArrowTask("arrow compute", *args, **kwargs) ``` """ return ArrowComputeTask(*args, **kwargs) def process(self, runtime_ctx: RuntimeContext, input_tables: List[arrow.Table]) -> arrow.Table: """ Put user-defined code here. Parameters ---------- runtime_ctx The runtime context, which defines a few global configuration info. input_datasets A list of arrow tables. The number of arrow tables equal to the number of input_deps. Returns ------- Return the output as a arrow table. Throw an exception if there is any error. """ raise NotImplementedError class ArrowStreamNode(Node): """ Run Python code to process the input datasets, which have been loaded as RecordBatchReader. See https://arrow.apache.org/docs/python/generated/pyarrow.RecordBatchReader.html. If the code needs to access attributes of runtime task, e.g. `local_rank`, `random_seed_long`, `numpy_random_gen`, - create a subclass of `ArrowStreamTask`, which implements `ArrowStreamTask.process(...)`, - override `ArrowStreamNode.spawn(...)` and return an instance of the subclass. Or use `runtime_ctx.task` in `process(runtime_ctx: RuntimeContext, ...)` function. """ default_batch_size = DEFAULT_BATCH_SIZE default_row_group_size = DEFAULT_ROW_GROUP_SIZE default_secs_checkpoint_interval = 180 def __init__( self, ctx: Context, input_deps: Tuple[Node, ...], *, process_func: Callable[[RuntimeContext, List[arrow.RecordBatchReader]], Iterable[arrow.Table]] = None, background_io_thread=True, streaming_batch_size: int = None, secs_checkpoint_interval: int = None, parquet_row_group_size: int = None, parquet_dictionary_encoding=False, parquet_compression="ZSTD", parquet_compression_level=3, use_duckdb_reader=False, output_name: str = None, output_path: str = None, cpu_limit: int = 1, gpu_limit: float = 0, memory_limit: Optional[int] = None, ) -> None: """ Construct a ArrowStreamNode. See :func:`Node.__init__` to find comments on other parameters. Parameters ---------- process_func, optional User-defined process function, which should have the same signature as `self.process(...)`. If user-defined function has extra parameters, use `functools.partial(...)` to bind arguments. See `test_partial_process_func` in `test/test_execution.py` for examples of usage. background_io_thread, optional Create a background IO thread for read/write. streaming_batch_size, optional Split the input datasets into batches, each of which has length less or equal to `streaming_batch_size`. secs_checkpoint_interval, optional Create a checkpoint of the stream task every `secs_checkpoint_interval` seconds. parquet_row_group_size, optional The number of rows stored in each row group of parquet file. Large row group size provides more opportunities to compress the data. Small row groups size could make filtering rows faster and achieve high concurrency. See https://duckdb.org/docs/data/parquet/tips.html#selecting-a-row_group_size. parquet_dictionary_encoding, optional Specify if we should use dictionary encoding in general or only for some columns. See `use_dictionary` in https://arrow.apache.org/docs/python/generated/pyarrow.parquet.ParquetWriter.html. use_duckdb_reader, optional Use duckdb (instead of pyarrow parquet module) to load parquet files as arrow table. cpu_limit, optional The max number of CPUs would be used by tasks generated from this node. This is a resource requirement specified by the user and used to guide task scheduling. smallpond does NOT enforce this limit. gpu_limit, optional The max number of GPUs would be used by tasks generated from this node. This is a resource requirement specified by the user and used to guide task scheduling. smallpond does NOT enforce this limit. memory_limit, optional The max memory would be used by tasks generated from this node. This is a resource requirement specified by the user and used to guide task scheduling. smallpond does NOT enforce this limit. """ super().__init__( ctx, input_deps, output_name, output_path, cpu_limit, gpu_limit, memory_limit, ) self.background_io_thread = background_io_thread and self.cpu_limit > 1 self.streaming_batch_size = streaming_batch_size or self.default_batch_size self.secs_checkpoint_interval = secs_checkpoint_interval or math.ceil( self.default_secs_checkpoint_interval / min(6, self.gpu_limit + 2, self.cpu_limit) ) self.parquet_row_group_size = parquet_row_group_size or self.default_row_group_size self.parquet_dictionary_encoding = parquet_dictionary_encoding self.parquet_compression = parquet_compression self.parquet_compression_level = parquet_compression_level self.use_duckdb_reader = use_duckdb_reader self.process_func = process_func def __str__(self) -> str: return ( super().__str__() + f", process_func={self.process_func}, background_io_thread={self.background_io_thread}, streaming_batch_size={self.streaming_batch_size}, checkpoint_interval={self.secs_checkpoint_interval}s" ) @Node.task_factory def create_task( self, runtime_ctx: RuntimeContext, input_deps: List[Task], partition_infos: List[PartitionInfo], ) -> ArrowStreamTask: return self.spawn( runtime_ctx, input_deps, partition_infos, self.process_func or self.slim_copy().process, # warn: do not call self.slim_copy() in __init__ as attributes may not be fully initialized self.background_io_thread, self.streaming_batch_size, self.secs_checkpoint_interval, self.parquet_row_group_size, self.parquet_dictionary_encoding, self.parquet_compression, self.parquet_compression_level, self.use_duckdb_reader, self.output_name, self.output_path, self.cpu_limit, self.gpu_limit, self.memory_limit, ) def spawn(self, *args, **kwargs) -> ArrowStreamTask: """ Return an instance of subclass of `ArrowStreamTask`. The subclass should override `ArrowStreamTask.process(...)`. Examples -------- ``` class CopyInputStreamTask(ArrowStreamTask): def __init__(self, msg: str, *args, **kwargs) -> None: super().__init__(*args, **kwargs) self.msg = msg def process(self, runtime_ctx: RuntimeContext, input_readers: List[arrow.RecordBatchReader]) -> Iterable[arrow.Table]: for batch in input_readers[0]: yield arrow.Table.from_batches([batch]) class CopyInputStreamNode(ArrowStreamNode): default_batch_size = 10 def spawn(self, *args, **kwargs) -> CopyInputStreamTask: return CopyInputStreamTask("arrow stream", *args, **kwargs) ``` """ return ArrowStreamTask(*args, **kwargs) def process(self, runtime_ctx: RuntimeContext, input_readers: List[arrow.RecordBatchReader]) -> Iterable[arrow.Table]: """ Put user-defined code here. Parameters ---------- runtime_ctx The runtime context, which defines a few global configuration info. input_readers A list of RecordBatchReader. The number of readers equal to the number of input_deps. Returns ------- Return the output as a arrow table. Throw an exception if there is any error. """ raise NotImplementedError class ArrowBatchNode(ArrowStreamNode): """ Run user-defined code to process the input datasets as a series of arrow tables. """ def spawn(self, *args, **kwargs) -> ArrowBatchTask: return ArrowBatchTask(*args, **kwargs) def process(self, runtime_ctx: RuntimeContext, input_tables: List[arrow.Table]) -> arrow.Table: raise NotImplementedError class PandasComputeNode(ArrowComputeNode): """ Run Python code to process the input datasets as a single pandas DataFrame. """ def spawn(self, *args, **kwargs) -> PandasComputeTask: return PandasComputeTask(*args, **kwargs) def process(self, runtime_ctx: RuntimeContext, input_dfs: List[pd.DataFrame]) -> pd.DataFrame: raise NotImplementedError class PandasBatchNode(ArrowStreamNode): """ Run Python code to process the input datasets as a series of pandas DataFrames. """ def spawn(self, *args, **kwargs) -> PandasBatchTask: return PandasBatchTask(*args, **kwargs) def process(self, runtime_ctx: RuntimeContext, input_dfs: List[pd.DataFrame]) -> pd.DataFrame: raise NotImplementedError class SqlEngineNode(Node): """ Run SQL query against the outputs of input_deps. """ max_udf_cpu_limit = 3 default_cpu_limit = 1 default_memory_limit = None default_row_group_size = DEFAULT_ROW_GROUP_SIZE enable_resource_boost = True def __init__( self, ctx: Context, input_deps: Tuple[Node, ...], sql_query: Union[str, Iterable[str]], *, udfs: List[Union[str, UserDefinedFunction]] = None, per_thread_output=True, materialize_output=True, materialize_in_memory=False, relax_memory_if_oom=None, batched_processing=False, extension_paths: List[str] = None, udf_module_paths: List[str] = None, enable_temp_directory=False, parquet_row_group_size: int = None, parquet_dictionary_encoding: bool = False, parquet_compression="ZSTD", parquet_compression_level=3, output_name: Optional[str] = None, output_path: Optional[str] = None, cpu_limit: Optional[int] = None, memory_limit: Optional[int] = None, cpu_overcommit_ratio: float = 1.0, memory_overcommit_ratio: float = 0.9, ) -> None: """ Construct a SqlEngineNode. See :func:`Node.__init__` to find comments on other parameters. Parameters ---------- sql_query SQL query string or a list of query strings, currently DuckDB query syntax is supported, see https://duckdb.org/docs/sql/query_syntax/select. All queries are executed. But only the results of the last query is persisted as the output. The output dataset of each `input_deps` can be referenced as `{0}`, `{1}`, `{2}`, etc. For example, the following query counts the total number of product items from `{0}` that have `category_id` included in `{1}`. .. code-block:: select count(product_item.id) from {0} where product_item.id > 0 and product_item.category_id in ( select category_id from {1} ) The following placeholders are supported in the query: - `{batch_index}`: the index of the current batch. - `{query_index}`: the index of the current query. - `{rand_seed}`: the random seed of the current query. - `{__data_partition__}`: the index of the current data partition. udfs, optional A list of user-defined functions to be referenced in `sql_query`. Each element can be one of the following: - A `@udf` decorated function. - A path to a duckdb extension file, e.g. `path/to/udf.duckdb_extension`. - A string returned by `ctx.create_function()` or `ctx.create_duckdb_extension()`. If `udfs` is not empty, the resource requirement is downgraded to `min(cpu_limit, 3)` and `min(memory_limit, 50*GB)` since UDF execution in duckdb is not highly paralleled. per_thread_output, optional If the final number of Parquet files is not important, writing one file per thread can significantly improve performance. Also see https://duckdb.org/docs/data/parquet/tips.html#enabling-per_thread_output. materialize_output, optional Query result is materialized to the underlying filesystem as parquet files if enabled. materialize_in_memory, optional Materialize query result in memory before writing to the underlying filesystem, by default False. relax_memory_if_oom, optional Double the memory limit and retry if sql engine OOM, by default False. batched_processing, optional Split input dataset into multiple batches, each of which fits into memory limit, and then run sql query against each batch. Enabled only if `len(input_deps) == 1`. extension_paths, optional A list of duckdb extension paths to be loaded at runtime. enable_temp_directory, optional Write temp files when memory is low, by default False. parquet_row_group_size, optional The number of rows stored in each row group of parquet file. Large row group size provides more opportunities to compress the data. Small row groups size could make filtering rows faster and achieve high concurrency. See https://duckdb.org/docs/data/parquet/tips.html#selecting-a-row_group_size. parquet_dictionary_encoding, optional Specify if we should use dictionary encoding in general or only for some columns. When encoding the column, if the dictionary size is too large, the column will fallback to PLAIN encoding. By default, dictionary encoding is enabled for all columns. Set it to False to disable dictionary encoding, or pass in column names to enable it only for specific columns. eg: parquet_dictionary_encoding=['column_1'] cpu_limit, optional The max number of CPUs used by the SQL engine. memory_limit, optional The max memory used by the SQL engine. cpu_overcommit_ratio, optional The effective number of threads used by the SQL engine is: `cpu_limit * cpu_overcommit_ratio`. memory_overcommit_ratio, optional The effective size of memory used by the SQL engine is: `memory_limit * memory_overcommit_ratio`. """ cpu_limit = cpu_limit or self.default_cpu_limit memory_limit = memory_limit or self.default_memory_limit if udfs is not None: if self.max_udf_cpu_limit is not None and cpu_limit > self.max_udf_cpu_limit: warnings.warn(f"UDF execution is not highly paralleled, downgrade cpu_limit from {cpu_limit} to {self.max_udf_cpu_limit}") cpu_limit = self.max_udf_cpu_limit memory_limit = None if relax_memory_if_oom is not None: warnings.warn( "Argument 'relax_memory_if_oom' has been deprecated", DeprecationWarning, stacklevel=3, ) assert isinstance(sql_query, str) or (isinstance(sql_query, Iterable) and all(isinstance(q, str) for q in sql_query)) super().__init__( ctx, input_deps, output_name, output_path, cpu_limit=cpu_limit, memory_limit=memory_limit, ) self.sql_queries = [sql_query] if isinstance(sql_query, str) else list(sql_query) self.udfs = [ctx.create_duckdb_extension(path) for path in extension_paths or []] + [ ctx.create_external_module(path) for path in udf_module_paths or [] ] for udf in udfs or []: if isinstance(udf, UserDefinedFunction): name = ctx.create_function(udf.name, udf.func, udf.params, udf.return_type, udf.use_arrow_type) else: assert isinstance(udf, str), f"udf must be a string: {udf}" if udf in ctx.udfs: name = udf elif udf.endswith(".duckdb_extension"): name = ctx.create_duckdb_extension(udf) elif udf.endswith(".so"): name = ctx.create_external_module(udf) else: raise ValueError(f"invalid udf: {udf}") self.udfs.append(name) self.per_thread_output = per_thread_output self.materialize_output = materialize_output self.materialize_in_memory = materialize_in_memory self.batched_processing = batched_processing and len(input_deps) == 1 self.enable_temp_directory = enable_temp_directory self.parquet_row_group_size = parquet_row_group_size or self.default_row_group_size self.parquet_dictionary_encoding = parquet_dictionary_encoding self.parquet_compression = parquet_compression self.parquet_compression_level = parquet_compression_level self.cpu_overcommit_ratio = cpu_overcommit_ratio self.memory_overcommit_ratio = memory_overcommit_ratio def __str__(self) -> str: return super().__str__() + f", sql_query=<{self.oneline_query[:100]}...>, udfs={self.udfs}, batched_processing={self.batched_processing}" @property def oneline_query(self) -> str: return "; ".join(" ".join(filter(None, map(str.strip, query.splitlines()))) for query in self.sql_queries) @Node.task_factory def create_task( self, runtime_ctx: RuntimeContext, input_deps: List[Task], partition_infos: List[PartitionInfo], ) -> SqlEngineTask: return self.spawn( runtime_ctx, input_deps, partition_infos, self.sql_queries, udfs=[self.ctx.udfs[name] for name in self.udfs], per_thread_output=self.per_thread_output, materialize_output=self.materialize_output, materialize_in_memory=self.materialize_in_memory, batched_processing=self.batched_processing, enable_temp_directory=self.enable_temp_directory, parquet_row_group_size=self.parquet_row_group_size, parquet_dictionary_encoding=self.parquet_dictionary_encoding, parquet_compression=self.parquet_compression, parquet_compression_level=self.parquet_compression_level, output_name=self.output_name, output_path=self.output_path, cpu_limit=self.cpu_limit, gpu_limit=self.gpu_limit, memory_limit=self.memory_limit, cpu_overcommit_ratio=self.cpu_overcommit_ratio, memory_overcommit_ratio=self.memory_overcommit_ratio, ) def spawn(self, *args, **kwargs) -> SqlEngineTask: return SqlEngineTask(*args, **kwargs) @property def num_partitions(self) -> int: return self.input_deps[0].num_partitions class UnionNode(Node): """ Union two or more nodes into one flow of data. """ def __init__(self, ctx: Context, input_deps: Tuple[Node, ...]): """ Union two or more `input_deps` into one flow of data. Parameters ---------- input_deps All input deps should have the same set of partition dimensions. """ super().__init__(ctx, input_deps) class RootNode(Node): """ A virtual node that assembles multiple nodes and outputs nothing. """ def __init__(self, ctx: Context, input_deps: Tuple[Node, ...]): """ Assemble multiple nodes into a root node. """ super().__init__(ctx, input_deps) class ConsolidateNode(Node): """ Consolidate partitions into larger ones. """ def __init__(self, ctx: Context, input_dep: Node, dimensions: List[str]): """ Effectively reduces the number of partitions without shuffling the data across the network. Parameters ---------- dimensions Partitions would be grouped by these `dimensions` and consolidated into larger partitions. """ assert isinstance(dimensions, Iterable), f"dimensions is not iterable: {dimensions}" assert all(isinstance(dim, str) for dim in dimensions), f"some dimensions are not strings: {dimensions}" super().__init__(ctx, [input_dep]) self.dimensions = set(list(dimensions) + [PartitionInfo.toplevel_dimension]) def __str__(self) -> str: return super().__str__() + f", dimensions={self.dimensions}" @Node.task_factory def create_task( self, runtime_ctx: RuntimeContext, input_deps: List[Task], partition_infos: List[PartitionInfo], ) -> MergeDataSetsTask: return MergeDataSetsTask(runtime_ctx, input_deps, partition_infos) class PartitionNode(Node): """ The base class for all partition nodes. """ max_num_producer_tasks = 100 max_card_of_producers_x_consumers = 4_096_000 def __init__( self, ctx: Context, input_deps: Tuple[Node, ...], npartitions: int, dimension: str = None, nested: bool = False, output_name: str = None, output_path: str = None, cpu_limit: int = 1, memory_limit: Optional[int] = None, ) -> None: """ Partition the outputs of `input_deps` into n partitions. Parameters ---------- npartitions The dataset would be split and distributed to `npartitions` partitions. dimension The unique partition dimension. Required if this is a nested partition. nested, optional `npartitions` subpartitions are created in each existing partition of `input_deps` if true. Examples -------- See unit tests in `test/test_partition.py`. For nested partition see `test_nested_partition`. Why nested partition? See **5.1 Partial Partitioning** of [Advanced partitioning techniques for massively distributed computation](https://dl.acm.org/doi/10.1145/2213836.2213839). """ assert isinstance(npartitions, int), f"npartitions is not an integer: {npartitions}" assert dimension is None or re.match("[a-zA-Z0-9_]+", dimension), f"dimension has invalid format: {dimension}" assert not (nested and dimension is None), f"nested partition should have dimension" super().__init__(ctx, input_deps, output_name, output_path, cpu_limit, 0, memory_limit) self.npartitions = npartitions self.dimension = dimension if dimension is not None else PartitionInfo.default_dimension self.nested = nested def __str__(self) -> str: return super().__str__() + f", npartitions={self.npartitions}, dimension={self.dimension}, nested={self.nested}" @Node.task_factory def create_producer_task( self, runtime_ctx: RuntimeContext, input_deps: List[Task], partition_infos: List[PartitionInfo], ) -> PartitionProducerTask: raise NotImplementedError @Node.task_factory def create_consumer_task( self, runtime_ctx: RuntimeContext, input_deps: List[Task], partition_infos: List[PartitionInfo], ) -> PartitionConsumerTask: return PartitionConsumerTask(runtime_ctx, input_deps, partition_infos) @Node.task_factory def create_merge_task( self, runtime_ctx: RuntimeContext, input_deps: List[Task], partition_infos: List[PartitionInfo], ) -> MergeDataSetsTask: return MergeDataSetsTask(runtime_ctx, input_deps, partition_infos) @Node.task_factory def create_split_task( self, runtime_ctx: RuntimeContext, input_deps: List[Task], partition_infos: List[PartitionInfo], ) -> SplitDataSetTask: return SplitDataSetTask(runtime_ctx, input_deps, partition_infos) @property def num_partitions(self) -> int: return self.npartitions class RepeatPartitionNode(PartitionNode): """ Create a new partition dimension by repeating the `input_deps`. This is always a nested partition. """ def __init__( self, ctx: Context, input_deps: Tuple[Node, ...], npartitions: int, dimension: str, cpu_limit: int = 1, memory_limit: Optional[int] = None, ) -> None: super().__init__( ctx, input_deps, npartitions, dimension, nested=True, cpu_limit=cpu_limit, memory_limit=memory_limit, ) @Node.task_factory def create_producer_task( self, runtime_ctx: RuntimeContext, input_deps: List[Task], partition_infos: List[PartitionInfo], ) -> RepeatPartitionProducerTask: return RepeatPartitionProducerTask( runtime_ctx, input_deps, partition_infos, self.npartitions, self.dimension, self.cpu_limit, self.memory_limit, ) class UserDefinedPartitionNode(PartitionNode): """ Distribute the output files or rows of `input_deps` into n partitions based on user code. See unit test `test_user_defined_partition` in `test/test_partition.py`. """ def __init__( self, ctx: Context, input_deps: Tuple[Node, ...], npartitions: int, dimension: str = None, nested: bool = False, cpu_limit: int = 1, memory_limit: Optional[int] = None, ) -> None: super().__init__( ctx, input_deps, npartitions, dimension, nested, cpu_limit=cpu_limit, memory_limit=memory_limit, ) @Node.task_factory def create_producer_task( self, runtime_ctx: RuntimeContext, input_deps: List[Task], partition_infos: List[PartitionInfo], ) -> UserDefinedPartitionProducerTask: return UserDefinedPartitionProducerTask( runtime_ctx, input_deps, partition_infos, self.npartitions, self.dimension, self.partition, self.cpu_limit, self.memory_limit, ) def partition(self, runtime_ctx: RuntimeContext, dataset: DataSet) -> List[DataSet]: raise NotImplementedError class UserPartitionedDataSourceNode(UserDefinedPartitionNode): max_num_producer_tasks = 1 def __init__(self, ctx: Context, partitioned_datasets: List[DataSet], dimension: str = None) -> None: assert isinstance(partitioned_datasets, Iterable) and all(isinstance(dataset, DataSet) for dataset in partitioned_datasets) super().__init__( ctx, [DataSourceNode(ctx, dataset=None)], len(partitioned_datasets), dimension, nested=False, ) self.partitioned_datasets = partitioned_datasets def partition(self, runtime_ctx: RuntimeContext, dataset: DataSet) -> List[DataSet]: return self.partitioned_datasets class EvenlyDistributedPartitionNode(PartitionNode): """ Evenly distribute the output files or rows of `input_deps` into n partitions. """ max_num_producer_tasks = 1 def __init__( self, ctx: Context, input_deps: Tuple[Node, ...], npartitions: int, dimension: str = None, nested: bool = False, *, partition_by_rows=False, random_shuffle=False, output_name: str = None, output_path: str = None, cpu_limit: int = 1, memory_limit: Optional[int] = None, ) -> None: """ Evenly distribute the output files or rows of `input_deps` into n partitions. Parameters ---------- partition_by_rows, optional Evenly distribute rows instead of input files into `npartitions` partitions, by default distribute by files. random_shuffle, optional Random shuffle the list of paths or parquet row groups (if `partition_by_rows=True`) in input datasets. """ super().__init__( ctx, input_deps, npartitions, dimension, nested, output_name=output_name, output_path=output_path, cpu_limit=cpu_limit, memory_limit=memory_limit, ) self.partition_by_rows = partition_by_rows and npartitions > 1 self.random_shuffle = random_shuffle def __str__(self) -> str: return super().__str__() + f", partition_by_rows={self.partition_by_rows}, random_shuffle={self.random_shuffle}" @Node.task_factory def create_producer_task( self, runtime_ctx: RuntimeContext, input_deps: List[Task], partition_infos: List[PartitionInfo], ): return EvenlyDistributedPartitionProducerTask( runtime_ctx, input_deps, partition_infos, self.npartitions, self.dimension, self.partition_by_rows, self.random_shuffle, self.cpu_limit, self.memory_limit, ) class LoadPartitionedDataSetNode(PartitionNode): """ Load existing partitioned dataset (only parquet files are supported). """ max_num_producer_tasks = 10 def __init__( self, ctx: Context, input_deps: Tuple[Node, ...], npartitions: int, dimension: str = None, nested: bool = False, data_partition_column: str = None, hive_partitioning: bool = False, cpu_limit: int = 1, memory_limit: Optional[int] = None, ) -> None: assert dimension or data_partition_column, f"Both 'dimension' and 'data_partition_column' are none or empty" super().__init__( ctx, input_deps, npartitions, dimension or data_partition_column, nested, cpu_limit=cpu_limit, memory_limit=memory_limit, ) self.data_partition_column = data_partition_column self.hive_partitioning = hive_partitioning def __str__(self) -> str: return super().__str__() + f", data_partition_column={self.data_partition_column}, hive_partitioning={self.hive_partitioning}" @Node.task_factory def create_producer_task( self, runtime_ctx: RuntimeContext, input_deps: List[Task], partition_infos: List[PartitionInfo], ): return LoadPartitionedDataSetProducerTask( runtime_ctx, input_deps, partition_infos, self.npartitions, self.dimension, self.data_partition_column, self.hive_partitioning, self.cpu_limit, self.memory_limit, ) def DataSetPartitionNode( ctx: Context, input_deps: Tuple[Node, ...], npartitions: int, *, partition_by_rows=False, random_shuffle=False, data_partition_column=None, ): """ Partition the outputs of `input_deps` into n partitions. Parameters ---------- npartitions The number of partitions. The input files or rows would be evenly distributed to `npartitions` partitions. partition_by_rows, optional Evenly distribute rows instead of input files into `npartitions` partitions, by default distribute by files. random_shuffle, optional Random shuffle the list of paths or parquet row groups (if `partition_by_rows=True`) in input datasets. data_partition_column, optional Partition by files based on the partition keys stored in `data_partition_column` if specified. Default column name used by `HashPartitionNode` is `DATA_PARTITION_COLUMN_NAME`. Examples -------- See unit test `test_load_partitioned_datasets` in `test/test_partition.py`. """ assert not (partition_by_rows and data_partition_column), "partition_by_rows and data_partition_column cannot be set at the same time" if data_partition_column is None: partition_node = EvenlyDistributedPartitionNode( ctx, input_deps, npartitions, dimension=None, nested=False, partition_by_rows=partition_by_rows, random_shuffle=random_shuffle, ) if npartitions == 1: return ConsolidateNode(ctx, partition_node, dimensions=[]) else: return partition_node else: return LoadPartitionedDataSetNode( ctx, input_deps, npartitions, dimension=data_partition_column, nested=False, data_partition_column=data_partition_column, hive_partitioning=False, ) class HashPartitionNode(PartitionNode): """ Partition the outputs of `input_deps` into n partitions based on the hash values of `hash_columns`. """ default_cpu_limit = 1 default_memory_limit = None default_data_partition_column = DATA_PARTITION_COLUMN_NAME default_engine_type = "duckdb" default_row_group_size = DEFAULT_ROW_GROUP_SIZE max_num_producer_tasks = 1000 enable_resource_boost = True def __init__( self, ctx: Context, input_deps: Tuple[Node, ...], npartitions: int, hash_columns: List[str] = None, data_partition_column: str = None, *, dimension: str = None, nested: bool = False, engine_type: Literal["duckdb", "arrow"] = None, random_shuffle: bool = False, shuffle_only: bool = False, drop_partition_column: bool = False, use_parquet_writer: bool = False, hive_partitioning: bool = False, parquet_row_group_size: int = None, parquet_dictionary_encoding=False, parquet_compression="ZSTD", parquet_compression_level=3, output_name: str = None, output_path: str = None, cpu_limit: Optional[int] = None, memory_limit: Optional[int] = None, ) -> None: """ Construct a HashPartitionNode. See :func:`Node.__init__` to find comments on other parameters. Parameters ---------- npartitions The number of hash partitions. The number of generated parquet files would be proportional to `npartitions`. hash_columns The hash values are computed from `hash_columns`. data_partition_column, optional The name of column used to store partition keys. engine_type, optional The underlying query engine for hash partition. random_shuffle, optional Ignore `hash_columns` and shuffle each row to a random partition if true. shuffle_only, optional Ignore `hash_columns` and shuffle each row to the partition specified in `data_partition_column` if true. drop_partition_column, optional Exclude `data_partition_column` in output if true. use_parquet_writer, optional Convert partition data to arrow tables and append with parquet writer if true. This creates less number of intermediate files but makes partitioning slower. hive_partitioning, optional Use Hive partitioned write of duckdb if true. parquet_row_group_size, optional The number of rows stored in each row group of parquet file. Large row group size provides more opportunities to compress the data. Small row groups size could make filtering rows faster and achieve high concurrency. See https://duckdb.org/docs/data/parquet/tips.html#selecting-a-row_group_size. parquet_dictionary_encoding, optional Specify if we should use dictionary encoding in general or only for some columns. See `use_dictionary` in https://arrow.apache.org/docs/python/generated/pyarrow.parquet.ParquetWriter.html. """ assert not random_shuffle or not shuffle_only, f"random_shuffle and shuffle_only cannot be enabled at the same time" assert not shuffle_only or data_partition_column is not None, f"data_partition_column not specified for shuffle-only partitioning" assert data_partition_column is None or re.match( "[a-zA-Z0-9_]+", data_partition_column ), f"data_partition_column has invalid format: {data_partition_column}" assert engine_type in ( None, "duckdb", "arrow", ), f"unknown query engine type: {engine_type}" data_partition_column = data_partition_column or self.default_data_partition_column super().__init__( ctx, input_deps, npartitions, dimension or data_partition_column, nested, output_name, output_path, cpu_limit or self.default_cpu_limit, memory_limit or self.default_memory_limit, ) self.hash_columns = ["random()"] if random_shuffle else hash_columns self.data_partition_column = data_partition_column self.engine_type = engine_type or self.default_engine_type self.random_shuffle = random_shuffle self.shuffle_only = shuffle_only self.drop_partition_column = drop_partition_column self.use_parquet_writer = use_parquet_writer self.hive_partitioning = hive_partitioning and self.engine_type == "duckdb" self.parquet_row_group_size = parquet_row_group_size or self.default_row_group_size self.parquet_dictionary_encoding = parquet_dictionary_encoding self.parquet_compression = parquet_compression self.parquet_compression_level = parquet_compression_level def __str__(self) -> str: return ( super().__str__() + f", hash_columns={self.hash_columns}, data_partition_column={self.data_partition_column}, engine_type={self.engine_type}, hive_partitioning={self.hive_partitioning}" ) @Node.task_factory def create_producer_task( self, runtime_ctx: RuntimeContext, input_deps: List[Task], partition_infos: List[PartitionInfo], ) -> HashPartitionTask: return HashPartitionTask.create( self.engine_type, runtime_ctx, input_deps, partition_infos, self.npartitions, self.dimension, self.hash_columns, self.data_partition_column, self.random_shuffle, self.shuffle_only, self.drop_partition_column, self.use_parquet_writer, self.hive_partitioning, self.parquet_row_group_size, self.parquet_dictionary_encoding, self.parquet_compression, self.parquet_compression_level, self.output_name, self.output_path, self.cpu_limit, self.memory_limit, ) class ShuffleNode(HashPartitionNode): def __init__( self, ctx: Context, input_deps: Tuple[Node, ...], npartitions: int, data_partition_column: str = None, *, dimension: str = None, nested: bool = False, engine_type: Literal["duckdb", "arrow"] = None, use_parquet_writer: bool = False, hive_partitioning: bool = False, parquet_row_group_size: int = None, parquet_dictionary_encoding=False, parquet_compression="ZSTD", parquet_compression_level=3, output_name: str = None, output_path: str = None, cpu_limit: Optional[int] = None, memory_limit: Optional[int] = None, ) -> None: super().__init__( ctx, input_deps, npartitions, hash_columns=None, data_partition_column=data_partition_column, dimension=dimension, nested=nested, engine_type=engine_type, random_shuffle=False, shuffle_only=True, drop_partition_column=False, use_parquet_writer=use_parquet_writer, hive_partitioning=hive_partitioning, parquet_row_group_size=parquet_row_group_size, parquet_dictionary_encoding=parquet_dictionary_encoding, parquet_compression=parquet_compression, parquet_compression_level=parquet_compression_level, output_name=output_name, output_path=output_path, cpu_limit=cpu_limit, memory_limit=memory_limit, ) class RangePartitionNode(PartitionNode): """ Partition the outputs of `input_deps` into partitions defined by `split_points`. This node is not implemented yet. """ def __init__( self, ctx: Context, input_deps: Tuple[Node, ...], split_points: List, dimension: str = None, nested: bool = False, output_name: str = None, output_path: str = None, cpu_limit: int = 16, memory_limit: int = 128 * GB, ) -> None: super().__init__( ctx, input_deps, len(split_points) + 1, dimension, nested, output_name, output_path, cpu_limit, memory_limit, ) self.split_points = split_points class ProjectionNode(Node): """ Select columns from output of an input node. """ def __init__( self, ctx: Context, input_dep: Node, columns: List[str] = None, generated_columns: List[Literal["filename", "file_row_number"]] = None, union_by_name=None, ) -> None: """ Construct a ProjectNode to select only the `columns` from output of `input_dep`. Parameters ---------- input_dep The input node whose output would be selected. columns, optional The columns to be selected or created. Select all columns if set to `None`. generated_columns Auto generated columns, supported values: `filename`, `file_row_number`. union_by_name, optional Unify the columns of different files by name (see https://duckdb.org/docs/data/multiple_files/combining_schemas#union-by-name). Examples -------- First create an ArrowComputeNode to extract hosts from urls. .. code-block:: python class ParseUrl(ArrowComputeNode): def process(self, runtime_ctx: RuntimeContext, input_tables: List[arrow.Table]) -> arrow.Table: assert input_tables[0].column_names == ["url"] # check url is the only column in table urls, = input_tables[0].columns hosts = [url.as_py().split("/", maxsplit=2)[0] for url in urls] return arrow.Table.from_arrays([hosts, urls], names=["host", "url"]) Suppose there are several columns in output of `data_partitions`, `ProjectionNode(..., ["url"])` selects the `url` column. Then only this column would be loaded into arrow table when feeding data to `ParseUrl`. .. code-block:: python urls_with_host = ParseUrl(ctx, (ProjectionNode(ctx, data_partitions, ["url"]),)) """ columns = columns or ["*"] generated_columns = generated_columns or [] assert all(col in GENERATED_COLUMNS for col in generated_columns), f"invalid values found in generated columns: {generated_columns}" assert not (set(columns) & set(generated_columns)), f"columns {columns} and generated columns {generated_columns} share common columns" super().__init__(ctx, [input_dep]) self.columns = columns self.generated_columns = generated_columns self.union_by_name = union_by_name def __str__(self) -> str: return super().__str__() + f", columns={self.columns}, generated_columns={self.generated_columns}, union_by_name={self.union_by_name}" @Node.task_factory def create_task( self, runtime_ctx: RuntimeContext, input_deps: List[Task], partition_infos: List[PartitionInfo], ) -> ProjectionTask: return ProjectionTask( runtime_ctx, input_deps, partition_infos, self.columns, self.generated_columns, self.union_by_name, ) class LimitNode(SqlEngineNode): """ Limit the number of rows of the output of an input node. """ def __init__(self, ctx: Context, input_dep: Node, limit: int) -> None: """ Construct a LimitNode to limit the number of rows of the output of `input_dep`. Parameters ---------- input_dep The input node whose output would be limited. limit The number of rows to be limited. """ super().__init__(ctx, (input_dep,), f"select * from {{0}} limit {limit}") self.limit = limit def __str__(self) -> str: return super().__str__() + f", limit={self.limit}" @Node.task_factory def create_merge_task( self, runtime_ctx: RuntimeContext, input_deps: List[Task], partition_infos: List[PartitionInfo], ) -> MergeDataSetsTask: return MergeDataSetsTask(runtime_ctx, input_deps, partition_infos) T = TypeVar("T") class LogicalPlanVisitor(Generic[T]): """ Visit the nodes of a logcial plan in depth-first order. """ def visit(self, node: Node, depth: int = 0) -> T: """ Visit a node depending on its type. If the method for the node type is not implemented, call `generic_visit`. """ if isinstance(node, DataSourceNode): return self.visit_data_source_node(node, depth) elif isinstance(node, DataSinkNode): return self.visit_data_sink_node(node, depth) elif isinstance(node, RootNode): return self.visit_root_node(node, depth) elif isinstance(node, UnionNode): return self.visit_union_node(node, depth) elif isinstance(node, ConsolidateNode): return self.visit_consolidate_node(node, depth) elif isinstance(node, PartitionNode): return self.visit_partition_node(node, depth) elif isinstance(node, PythonScriptNode): return self.visit_python_script_node(node, depth) elif isinstance(node, ArrowComputeNode): return self.visit_arrow_compute_node(node, depth) elif isinstance(node, ArrowStreamNode): return self.visit_arrow_stream_node(node, depth) elif isinstance(node, LimitNode): return self.visit_limit_node(node, depth) elif isinstance(node, SqlEngineNode): return self.visit_query_engine_node(node, depth) elif isinstance(node, ProjectionNode): return self.visit_projection_node(node, depth) else: raise Exception(f"Unknown node type: {node}") def generic_visit(self, node: Node, depth: int) -> T: """This visitor calls visit() on all children of the node.""" for dep in node.input_deps: self.visit(dep, depth + 1) def visit_data_source_node(self, node: DataSourceNode, depth: int) -> T: return self.generic_visit(node, depth) def visit_data_sink_node(self, node: DataSinkNode, depth: int) -> T: return self.generic_visit(node, depth) def visit_root_node(self, node: RootNode, depth: int) -> T: return self.generic_visit(node, depth) def visit_union_node(self, node: UnionNode, depth: int) -> T: return self.generic_visit(node, depth) def visit_consolidate_node(self, node: ConsolidateNode, depth: int) -> T: return self.generic_visit(node, depth) def visit_partition_node(self, node: PartitionNode, depth: int) -> T: return self.generic_visit(node, depth) def visit_python_script_node(self, node: PythonScriptNode, depth: int) -> T: return self.generic_visit(node, depth) def visit_arrow_compute_node(self, node: ArrowComputeNode, depth: int) -> T: return self.generic_visit(node, depth) def visit_arrow_stream_node(self, node: ArrowStreamNode, depth: int) -> T: return self.generic_visit(node, depth) def visit_query_engine_node(self, node: SqlEngineNode, depth: int) -> T: return self.generic_visit(node, depth) def visit_projection_node(self, node: ProjectionNode, depth: int) -> T: return self.generic_visit(node, depth) def visit_limit_node(self, node: LimitNode, depth: int) -> T: return self.generic_visit(node, depth) class LogicalPlan(object): """ The logical plan that defines a directed acyclic computation graph. """ def __init__(self, ctx: Context, root_node: Node) -> None: self.ctx = ctx self.root_node = root_node def __str__(self) -> str: return self.explain_str() def explain_str(self) -> str: """ Return a string that shows the structure of the logical plan. """ visited = set() def to_str(node: Node, depth: int = 0) -> List[str]: lines = [" " * depth + str(node) + ", file= " + node.location] if node.id in visited: return lines + [" " * depth + " (omitted ...)"] visited.add(node.id) lines += [" " * depth + f" | {name}: {stats}" for name, stats in node.perf_stats.items()] for dep in node.input_deps: lines.extend(to_str(dep, depth + 1)) return lines return os.linesep.join(to_str(self.root_node)) def graph(self) -> Digraph: """ Return a graphviz graph that shows the structure of the logical plan. """ dot = Digraph(comment="smallpond") for node in self.nodes.values(): dot.node(str(node.id), repr(node)) for dep in node.input_deps: dot.edge(str(dep.id), str(node.id)) return dot @property def nodes(self) -> Dict[NodeId, Node]: nodes = {} def collect_nodes(node: Node): if node.id in nodes: return nodes[node.id] = node for dep in node.input_deps: collect_nodes(dep) collect_nodes(self.root_node) return nodes