awswrangler/distributed/ray/datasources/arrow_csv_datasink.py (33 lines of code) (raw):

"""Ray PandasTextDatasink Module.""" from __future__ import annotations import io import logging from typing import Any from pyarrow import csv from ray.data.block import BlockAccessor from ray.data.datasource.filename_provider import FilenameProvider from awswrangler.distributed.ray.datasources.file_datasink import _BlockFileDatasink _logger: logging.Logger = logging.getLogger(__name__) class ArrowCSVDatasink(_BlockFileDatasink): """A datasink that writes CSV files using Arrow.""" def __init__( self, path: str, *, filename_provider: FilenameProvider | None = None, dataset_uuid: str | None = None, open_s3_object_args: dict[str, Any] | None = None, pandas_kwargs: dict[str, Any] | None = None, write_options: dict[str, Any] | None = None, **write_args: Any, ): super().__init__( path, file_format="csv", filename_provider=filename_provider, dataset_uuid=dataset_uuid, open_s3_object_args=open_s3_object_args, pandas_kwargs=pandas_kwargs, **write_args, ) self.write_options = write_options or {} def write_block(self, file: io.TextIOWrapper, block: BlockAccessor) -> None: """ Write a block of data to a file. Parameters ---------- block : BlockAccessor file : io.TextIOWrapper """ csv.write_csv(block.to_arrow(), file, csv.WriteOptions(**self.write_options))