awswrangler/distributed/ray/modin/s3/_write_orc.py (61 lines of code) (raw):
"""Modin on Ray S3 write parquet module (PRIVATE)."""
from __future__ import annotations
import logging
import math
from typing import TYPE_CHECKING, Any, cast
import modin.pandas as pd
import pyarrow as pa
from awswrangler import exceptions
from awswrangler.distributed.ray.datasources import ArrowORCDatasink
from awswrangler.distributed.ray.modin._utils import _ray_dataset_from_df
from awswrangler.typing import ArrowEncryptionConfiguration
if TYPE_CHECKING:
from mypy_boto3_s3 import S3Client
_logger: logging.Logger = logging.getLogger(__name__)
def _to_orc_distributed(
df: pd.DataFrame,
schema: pa.Schema,
index: bool,
compression: str | None,
compression_ext: str,
pyarrow_additional_kwargs: dict[str, Any],
cpus: int,
dtype: dict[str, str],
s3_client: "S3Client" | None,
s3_additional_kwargs: dict[str, str] | None,
use_threads: bool | int,
path: str | None = None,
path_root: str | None = None,
filename_prefix: str | None = None,
max_rows_by_file: int | None = 0,
bucketing: bool = False,
encryption_configuration: ArrowEncryptionConfiguration | None = None,
) -> list[str]:
# Create Ray Dataset
ds = _ray_dataset_from_df(df)
if df.index.name is not None:
raise exceptions.InvalidArgumentCombination("Orc does not serialize index metadata on a default index.")
# Repartition into a single block if or writing into a single key or if bucketing is enabled
if ds.count() > 0 and (path or bucketing) and not max_rows_by_file:
_logger.warning(
"Repartitioning frame to single partition as a strict path was defined: %s. "
"This operation is inefficient for large datasets.",
path,
)
ds = ds.repartition(1)
# Repartition by max_rows_by_file
elif max_rows_by_file and (max_rows_by_file > 0):
ds = ds.repartition(math.ceil(ds.count() / max_rows_by_file))
if path and not path.endswith("/"):
path = f"{path}/"
datasink = ArrowORCDatasink(
path=cast(str, path or path_root),
dataset_uuid=filename_prefix,
open_s3_object_args={
"s3_additional_kwargs": s3_additional_kwargs,
},
index=index,
dtype=dtype,
compression=compression,
pyarrow_additional_kwargs=pyarrow_additional_kwargs,
schema=schema,
bucket_id=df.name if bucketing else None,
)
ds.write_datasink(datasink)
return datasink.get_write_paths()