awswrangler/distributed/ray/modin/s3/_write_orc.py (61 lines of code) (raw):

"""Modin on Ray S3 write parquet module (PRIVATE).""" from __future__ import annotations import logging import math from typing import TYPE_CHECKING, Any, cast import modin.pandas as pd import pyarrow as pa from awswrangler import exceptions from awswrangler.distributed.ray.datasources import ArrowORCDatasink from awswrangler.distributed.ray.modin._utils import _ray_dataset_from_df from awswrangler.typing import ArrowEncryptionConfiguration if TYPE_CHECKING: from mypy_boto3_s3 import S3Client _logger: logging.Logger = logging.getLogger(__name__) def _to_orc_distributed( df: pd.DataFrame, schema: pa.Schema, index: bool, compression: str | None, compression_ext: str, pyarrow_additional_kwargs: dict[str, Any], cpus: int, dtype: dict[str, str], s3_client: "S3Client" | None, s3_additional_kwargs: dict[str, str] | None, use_threads: bool | int, path: str | None = None, path_root: str | None = None, filename_prefix: str | None = None, max_rows_by_file: int | None = 0, bucketing: bool = False, encryption_configuration: ArrowEncryptionConfiguration | None = None, ) -> list[str]: # Create Ray Dataset ds = _ray_dataset_from_df(df) if df.index.name is not None: raise exceptions.InvalidArgumentCombination("Orc does not serialize index metadata on a default index.") # Repartition into a single block if or writing into a single key or if bucketing is enabled if ds.count() > 0 and (path or bucketing) and not max_rows_by_file: _logger.warning( "Repartitioning frame to single partition as a strict path was defined: %s. " "This operation is inefficient for large datasets.", path, ) ds = ds.repartition(1) # Repartition by max_rows_by_file elif max_rows_by_file and (max_rows_by_file > 0): ds = ds.repartition(math.ceil(ds.count() / max_rows_by_file)) if path and not path.endswith("/"): path = f"{path}/" datasink = ArrowORCDatasink( path=cast(str, path or path_root), dataset_uuid=filename_prefix, open_s3_object_args={ "s3_additional_kwargs": s3_additional_kwargs, }, index=index, dtype=dtype, compression=compression, pyarrow_additional_kwargs=pyarrow_additional_kwargs, schema=schema, bucket_id=df.name if bucketing else None, ) ds.write_datasink(datasink) return datasink.get_write_paths()