awswrangler/distributed/ray/datasources/pandas_text_datasink.py (77 lines of code) (raw):
"""Ray PandasTextDatasink Module."""
from __future__ import annotations
import io
import logging
from typing import Any, Callable
import pandas as pd
from ray.data.block import BlockAccessor
from ray.data.datasource.filename_provider import FilenameProvider
from awswrangler.distributed.ray.datasources.file_datasink import _BlockFileDatasink
_logger: logging.Logger = logging.getLogger(__name__)
class _PandasTextDatasink(_BlockFileDatasink):
"""A datasink that writes text files using Pandas IO."""
def __init__(
self,
path: str,
file_format: str,
write_text_func: Callable[..., None] | None,
*,
filename_provider: FilenameProvider | None = None,
dataset_uuid: str | None = None,
open_s3_object_args: dict[str, Any] | None = None,
pandas_kwargs: dict[str, Any] | None = None,
**write_args: Any,
):
super().__init__(
path,
file_format=file_format,
filename_provider=filename_provider,
dataset_uuid=dataset_uuid,
open_s3_object_args=open_s3_object_args,
pandas_kwargs=pandas_kwargs,
**write_args,
)
self.write_text_func = write_text_func
def write_block(self, file: io.TextIOWrapper, block: BlockAccessor) -> None:
"""
Write a block of data to a file.
Parameters
----------
block : BlockAccessor
file : pa.NativeFile
"""
write_text_func = self.write_text_func
write_text_func(block.to_pandas(), file, **self.pandas_kwargs) # type: ignore[misc]
class PandasCSVDatasink(_PandasTextDatasink):
"""A datasink that writes CSV files using Pandas IO."""
def __init__(
self,
path: str,
*,
filename_provider: FilenameProvider | None = None,
dataset_uuid: str | None = None,
open_s3_object_args: dict[str, Any] | None = None,
pandas_kwargs: dict[str, Any] | None = None,
**write_args: Any,
):
super().__init__(
path,
"csv",
pd.DataFrame.to_csv,
filename_provider=filename_provider,
dataset_uuid=dataset_uuid,
open_s3_object_args=open_s3_object_args,
pandas_kwargs=pandas_kwargs,
**write_args,
)
class PandasJSONDatasink(_PandasTextDatasink):
"""A datasink that writes CSV files using Pandas IO."""
def __init__(
self,
path: str,
*,
filename_provider: FilenameProvider | None = None,
dataset_uuid: str | None = None,
open_s3_object_args: dict[str, Any] | None = None,
pandas_kwargs: dict[str, Any] | None = None,
**write_args: Any,
):
super().__init__(
path,
"json",
pd.DataFrame.to_json,
filename_provider=filename_provider,
dataset_uuid=dataset_uuid,
open_s3_object_args=open_s3_object_args,
pandas_kwargs=pandas_kwargs,
**write_args,
)