awswrangler/distributed/ray/datasources/arrow_orc_datasink.py (47 lines of code) (raw):

"""Ray PandasTextDatasink Module.""" from __future__ import annotations import io import logging from typing import Any import pyarrow as pa from ray.data.block import BlockAccessor from ray.data.datasource.filename_provider import FilenameProvider from awswrangler._arrow import _df_to_table from awswrangler.distributed.ray.datasources.file_datasink import _BlockFileDatasink _logger: logging.Logger = logging.getLogger(__name__) class ArrowORCDatasink(_BlockFileDatasink): """A datasink that writes CSV files using Arrow.""" def __init__( self, path: str, *, filename_provider: FilenameProvider | None = None, dataset_uuid: str | None = None, open_s3_object_args: dict[str, Any] | None = None, pandas_kwargs: dict[str, Any] | None = None, schema: pa.Schema | None = None, index: bool = False, dtype: dict[str, str] | None = None, pyarrow_additional_kwargs: dict[str, Any] | None = None, **write_args: Any, ): super().__init__( path, file_format="orc", filename_provider=filename_provider, dataset_uuid=dataset_uuid, open_s3_object_args=open_s3_object_args, pandas_kwargs=pandas_kwargs, **write_args, ) self.pyarrow_additional_kwargs = pyarrow_additional_kwargs or {} self.schema = schema self.index = index self.dtype = dtype def write_block(self, file: io.TextIOWrapper, block: BlockAccessor) -> None: """ Write a block of data to a file. Parameters ---------- file : io.TextIOWrapper block : BlockAccessor """ from pyarrow import orc compression: str = self.write_args.get("compression", None) or "UNCOMPRESSED" orc.write_table( _df_to_table(block.to_pandas(), schema=self.schema, index=self.index, dtype=self.dtype), file, compression=compression, **self.pyarrow_additional_kwargs, )