awswrangler/distributed/ray/datasources/arrow_parquet_base_datasource.py (50 lines of code) (raw):
"""Ray ParquetBaseDatasource Module."""
from __future__ import annotations
from typing import Any, Iterator
# fs required to implicitly trigger S3 subsystem initialization
import pyarrow as pa
import pyarrow.fs
import pyarrow.parquet as pq
from ray.data.datasource.file_based_datasource import FileBasedDatasource
from awswrangler._arrow import _add_table_partitions
class ArrowParquetBaseDatasource(FileBasedDatasource):
"""Parquet datasource, for reading Parquet files."""
_FILE_EXTENSIONS = ["parquet"]
def __init__(
self,
paths: str | list[str],
path_root: str,
arrow_parquet_args: dict[str, Any] | None = None,
**file_based_datasource_kwargs: Any,
):
super().__init__(paths, **file_based_datasource_kwargs)
if arrow_parquet_args is None:
arrow_parquet_args = {}
self.path_root = path_root
self.arrow_parquet_args = arrow_parquet_args
def _read_stream(self, f: pa.NativeFile, path: str) -> Iterator[pa.Table]:
arrow_parquet_args = self.arrow_parquet_args
use_threads: bool = arrow_parquet_args.get("use_threads", False)
columns: list[str] | None = arrow_parquet_args.get("columns", None)
dataset_kwargs = arrow_parquet_args.get("dataset_kwargs", {})
coerce_int96_timestamp_unit: str | None = dataset_kwargs.get("coerce_int96_timestamp_unit", None)
decryption_properties = dataset_kwargs.get("decryption_properties", None)
table = pq.read_table(
f,
use_threads=use_threads,
columns=columns,
coerce_int96_timestamp_unit=coerce_int96_timestamp_unit,
decryption_properties=decryption_properties,
)
table = _add_table_partitions(
table=table,
path=f"s3://{path}",
path_root=self.path_root,
)
return [table] # type: ignore[return-value]
def _open_input_source(
self,
filesystem: pyarrow.fs.FileSystem,
path: str,
**open_args: Any,
) -> pa.NativeFile:
# Parquet requires `open_input_file` due to random access reads
return filesystem.open_input_file(path, **open_args)
def get_name(self) -> str:
"""Return a human-readable name for this datasource."""
return "ParquetBulk"