awswrangler/s3/_read_orc.py (295 lines of code) (raw):

"""Amazon S3 Read ORC Module (PRIVATE).""" from __future__ import annotations import datetime import itertools import logging from typing import ( TYPE_CHECKING, Any, Callable, ) import boto3 import pandas as pd import pyarrow as pa import pyarrow.dataset from typing_extensions import Literal from awswrangler import _data_types, _utils, exceptions from awswrangler._arrow import _add_table_partitions from awswrangler._config import apply_configs from awswrangler._distributed import engine from awswrangler._executor import _BaseExecutor, _get_executor from awswrangler.s3._fs import open_s3_object from awswrangler.s3._list import _path2list from awswrangler.s3._read import ( _apply_partition_filter, _check_version_id, _extract_partitions_dtypes_from_table_details, _get_num_output_blocks, _get_path_ignore_suffix, _get_path_root, _get_paths_for_glue_table, _TableMetadataReader, ) from awswrangler.typing import RaySettings, _ReadTableMetadataReturnValue if TYPE_CHECKING: from mypy_boto3_s3 import S3Client from pyarrow.orc import ORCFile FULL_READ_S3_BLOCK_SIZE = 20_971_520 # 20 MB (20 * 2**20) METADATA_READ_S3_BLOCK_SIZE = 131_072 # 128 KB (128 * 2**10) _logger: logging.Logger = logging.getLogger(__name__) def _pyarrow_orc_file_wrapper(source: Any) -> "ORCFile": from pyarrow.orc import ORCFile try: return ORCFile(source=source) except pyarrow.ArrowInvalid as ex: if str(ex) == "ORC file size is 0 bytes": _logger.warning("Ignoring empty file...") return None raise @engine.dispatch_on_engine def _read_orc_metadata_file( s3_client: "S3Client" | None, path: str, s3_additional_kwargs: dict[str, str] | None, use_threads: bool | int, version_id: str | None = None, ) -> pa.schema: with open_s3_object( path=path, mode="rb", version_id=version_id, use_threads=use_threads, s3_client=s3_client, s3_block_size=METADATA_READ_S3_BLOCK_SIZE, s3_additional_kwargs=s3_additional_kwargs, ) as f: orc_file: "ORCFile" | None = _pyarrow_orc_file_wrapper(source=f) if orc_file: return orc_file.schema return None class _ORCTableMetadataReader(_TableMetadataReader): def _read_metadata_file( self, s3_client: "S3Client" | None, path: str, s3_additional_kwargs: dict[str, str] | None, use_threads: bool | int, version_id: str | None = None, coerce_int96_timestamp_unit: str | None = None, ) -> pa.schema: return _read_orc_metadata_file( s3_client=s3_client, path=path, s3_additional_kwargs=s3_additional_kwargs, use_threads=use_threads, version_id=version_id, ) def _read_orc_file( s3_client: "S3Client" | None, path: str, path_root: str | None, columns: list[str] | None, s3_additional_kwargs: dict[str, str] | None, use_threads: bool | int, version_id: str | None = None, ) -> pa.Table: s3_block_size: int = FULL_READ_S3_BLOCK_SIZE if columns else -1 # One shot for a full read or see constant with open_s3_object( path=path, mode="rb", version_id=version_id, use_threads=use_threads, s3_block_size=s3_block_size, s3_additional_kwargs=s3_additional_kwargs, s3_client=s3_client, ) as f: orc_file: "ORCFile" | None = _pyarrow_orc_file_wrapper( source=f, ) if orc_file is None: raise exceptions.InvalidFile(f"Invalid ORC file: {path}") return _add_table_partitions( table=orc_file.read(columns=columns), path=path, path_root=path_root, ) @engine.dispatch_on_engine def _read_orc( paths: list[str], path_root: str | None, schema: pa.schema | None, columns: list[str] | None, use_threads: bool | int, override_num_blocks: int, version_ids: dict[str, str] | None, s3_client: "S3Client" | None, s3_additional_kwargs: dict[str, Any] | None, arrow_kwargs: dict[str, Any], ) -> pd.DataFrame: executor: _BaseExecutor = _get_executor(use_threads=use_threads) tables = executor.map( _read_orc_file, s3_client, paths, itertools.repeat(path_root), itertools.repeat(columns), itertools.repeat(s3_additional_kwargs), itertools.repeat(use_threads), [version_ids.get(p) if isinstance(version_ids, dict) else None for p in paths], ) return _utils.table_refs_to_df(tables, kwargs=arrow_kwargs) @_utils.validate_distributed_kwargs( unsupported_kwargs=["boto3_session", "version_id", "s3_additional_kwargs", "dtype_backend"], ) @apply_configs def read_orc( path: str | list[str], path_root: str | None = None, dataset: bool = False, path_suffix: str | list[str] | None = None, path_ignore_suffix: str | list[str] | None = None, ignore_empty: bool = True, partition_filter: Callable[[dict[str, str]], bool] | None = None, columns: list[str] | None = None, validate_schema: bool = False, last_modified_begin: datetime.datetime | None = None, last_modified_end: datetime.datetime | None = None, version_id: str | dict[str, str] | None = None, dtype_backend: Literal["numpy_nullable", "pyarrow"] = "numpy_nullable", use_threads: bool | int = True, ray_args: RaySettings | None = None, boto3_session: boto3.Session | None = None, s3_additional_kwargs: dict[str, Any] | None = None, pyarrow_additional_kwargs: dict[str, Any] | None = None, ) -> pd.DataFrame: """Read ORC file(s) from an S3 prefix or list of S3 objects paths. The concept of `dataset` enables more complex features like partitioning and catalog integration (AWS Glue Catalog). This function accepts Unix shell-style wildcards in the path argument. * (matches everything), ? (matches any single character), [seq] (matches any character in seq), [!seq] (matches any character not in seq). If you want to use a path which includes Unix shell-style wildcard characters (`*, ?, []`), you can use `glob.escape(path)` before passing the argument to this function. Note ---- If `use_threads=True`, the number of threads is obtained from os.cpu_count(). Note ---- Filtering by `last_modified begin` and `last_modified end` is applied after listing all S3 files Parameters ---------- path S3 prefix (accepts Unix shell-style wildcards) (e.g. s3://bucket/prefix) or list of S3 objects paths (e.g. [s3://bucket/key0, s3://bucket/key1]). path_root Root path of the dataset. If dataset=`True`, it is used as a starting point to load partition columns. dataset If `True`, read an ORC dataset instead of individual file(s), loading all related partitions as columns. path_suffix Suffix or List of suffixes to be read (e.g. [".gz.orc", ".snappy.orc"]). If None, reads all files. (default) path_ignore_suffix Suffix or List of suffixes to be ignored.(e.g. [".csv", "_SUCCESS"]). If None, reads all files. (default) ignore_empty Ignore files with 0 bytes. partition_filter Callback Function filters to apply on PARTITION columns (PUSH-DOWN filter). This function must receive a single argument (Dict[str, str]) where keys are partitions names and values are partitions values. Partitions values must be strings and the function must return a bool, True to read the partition or False to ignore it. Ignored if `dataset=False`. E.g ``lambda x: True if x["year"] == "2020" and x["month"] == "1" else False`` https://aws-sdk-pandas.readthedocs.io/en/3.11.0/tutorials/023%20-%20Flexible%20Partitions%20Filter.html columns List of columns to read from the file(s). validate_schema Check that the schema is consistent across individual files. last_modified_begin Filter S3 objects by Last modified date. Filter is only applied after listing all objects. last_modified_end Filter S3 objects by Last modified date. Filter is only applied after listing all objects. version_id Version id of the object or mapping of object path to version id. (e.g. {'s3://bucket/key0': '121212', 's3://bucket/key1': '343434'}) dtype_backend Which dtype_backend to use, e.g. whether a DataFrame should have NumPy arrays, nullable dtypes are used for all dtypes that have a nullable implementation when “numpy_nullable” is set, pyarrow is used for all dtypes if “pyarrow” is set. The dtype_backends are still experimential. The "pyarrow" backend is only supported with Pandas 2.0 or above. use_threads True to enable concurrent requests, False to disable multiple threads. If enabled, os.cpu_count() is used as the max number of threads. If integer is provided, specified number is used. ray_args Parameters of the Ray Modin settings. Only used when distributed computing is used with Ray and Modin installed. boto3_session Boto3 Session. The default boto3 session is used if None is received. s3_additional_kwargs Forward to S3 botocore requests. pyarrow_additional_kwargs Forwarded to `to_pandas` method converting from PyArrow tables to Pandas DataFrame. Valid values include "split_blocks", "self_destruct", "ignore_metadata". e.g. pyarrow_additional_kwargs={'split_blocks': True}. Returns ------- Pandas DataFrame. Examples -------- Reading all ORC files under a prefix >>> import awswrangler as wr >>> df = wr.s3.read_orc(path='s3://bucket/prefix/') Reading all ORC files from a list >>> import awswrangler as wr >>> df = wr.s3.read_orc(path=['s3://bucket/filename0.orc', 's3://bucket/filename1.orc']) Reading ORC Dataset with PUSH-DOWN filter over partitions >>> import awswrangler as wr >>> my_filter = lambda x: True if x["city"].startswith("new") else False >>> df = wr.s3.read_orc(path, dataset=True, partition_filter=my_filter) """ s3_client = _utils.client(service_name="s3", session=boto3_session) paths: list[str] = _path2list( path=path, s3_client=s3_client, suffix=path_suffix, ignore_suffix=_get_path_ignore_suffix(path_ignore_suffix=path_ignore_suffix), last_modified_begin=last_modified_begin, last_modified_end=last_modified_end, ignore_empty=ignore_empty, s3_additional_kwargs=s3_additional_kwargs, ) if not path_root: path_root = _get_path_root(path=path, dataset=dataset) if path_root and partition_filter: paths = _apply_partition_filter(path_root=path_root, paths=paths, filter_func=partition_filter) if len(paths) < 1: raise exceptions.NoFilesFound(f"No files Found on: {path}.") version_ids = _check_version_id(paths=paths, version_id=version_id) # Create PyArrow schema based on file metadata, columns filter, and partitions schema: pa.schema | None = None if validate_schema: metadata_reader = _ORCTableMetadataReader() schema = metadata_reader.validate_schemas( paths=paths, path_root=path_root, columns=columns, validate_schema=validate_schema, s3_client=s3_client, version_ids=version_ids, use_threads=use_threads, s3_additional_kwargs=s3_additional_kwargs, ) arrow_kwargs = _data_types.pyarrow2pandas_defaults( use_threads=use_threads, kwargs=pyarrow_additional_kwargs, dtype_backend=dtype_backend ) return _read_orc( paths, path_root=path_root, schema=schema, columns=columns, use_threads=use_threads, override_num_blocks=_get_num_output_blocks(ray_args), s3_client=s3_client, s3_additional_kwargs=s3_additional_kwargs, arrow_kwargs=arrow_kwargs, version_ids=version_ids, ) @_utils.validate_distributed_kwargs( unsupported_kwargs=["boto3_session", "s3_additional_kwargs", "dtype_backend"], ) @apply_configs def read_orc_table( table: str, database: str, filename_suffix: str | list[str] | None = None, filename_ignore_suffix: str | list[str] | None = None, catalog_id: str | None = None, partition_filter: Callable[[dict[str, str]], bool] | None = None, columns: list[str] | None = None, validate_schema: bool = True, dtype_backend: Literal["numpy_nullable", "pyarrow"] = "numpy_nullable", use_threads: bool | int = True, ray_args: RaySettings | None = None, boto3_session: boto3.Session | None = None, s3_additional_kwargs: dict[str, Any] | None = None, pyarrow_additional_kwargs: dict[str, Any] | None = None, ) -> pd.DataFrame: """Read Apache ORC table registered in the AWS Glue Catalog. Note ---- If `use_threads=True`, the number of threads is obtained from os.cpu_count(). Parameters ---------- table AWS Glue Catalog table name. database AWS Glue Catalog database name. filename_suffix Suffix or List of suffixes to be read (e.g. [".gz.orc", ".snappy.orc"]). If None, read all files. (default) filename_ignore_suffix Suffix or List of suffixes for S3 keys to be ignored.(e.g. [".csv", "_SUCCESS"]). If None, read all files. (default) catalog_id The ID of the Data Catalog from which to retrieve Databases. If none is provided, the AWS account ID is used by default. partition_filter Callback Function filters to apply on PARTITION columns (PUSH-DOWN filter). This function must receive a single argument (Dict[str, str]) where keys are partitions names and values are partitions values. Partitions values must be strings and the function must return a bool, True to read the partition or False to ignore it. Ignored if `dataset=False`. E.g ``lambda x: True if x["year"] == "2020" and x["month"] == "1" else False`` https://aws-sdk-pandas.readthedocs.io/en/3.11.0/tutorials/023%20-%20Flexible%20Partitions%20Filter.html columns List of columns to read from the file(s). validate_schema Check that the schema is consistent across individual files. dtype_backend Which dtype_backend to use, e.g. whether a DataFrame should have NumPy arrays, nullable dtypes are used for all dtypes that have a nullable implementation when “numpy_nullable” is set, pyarrow is used for all dtypes if “pyarrow” is set. The dtype_backends are still experimential. The "pyarrow" backend is only supported with Pandas 2.0 or above. use_threads True to enable concurrent requests, False to disable multiple threads. If enabled, os.cpu_count() is used as the max number of threads. If integer is provided, specified number is used. ray_args Parameters of the Ray Modin settings. Only used when distributed computing is used with Ray and Modin installed. boto3_session Boto3 Session. The default boto3 session is used if None is received. s3_additional_kwargs Forward to S3 botocore requests. pyarrow_additional_kwargs Forwarded to `to_pandas` method converting from PyArrow tables to Pandas DataFrame. Valid values include "split_blocks", "self_destruct", "ignore_metadata". e.g. pyarrow_additional_kwargs={'split_blocks': True}. Returns ------- Pandas DataFrame. Examples -------- Reading ORC Table >>> import awswrangler as wr >>> df = wr.s3.read_orc_table(database='...', table='...') Reading ORC Dataset with PUSH-DOWN filter over partitions >>> import awswrangler as wr >>> my_filter = lambda x: True if x["city"].startswith("new") else False >>> df = wr.s3.read_orc_table(path, dataset=True, partition_filter=my_filter) """ paths: str | list[str] path_root: str | None paths, path_root, res = _get_paths_for_glue_table( table=table, database=database, filename_suffix=filename_suffix, filename_ignore_suffix=filename_ignore_suffix, catalog_id=catalog_id, partition_filter=partition_filter, boto3_session=boto3_session, s3_additional_kwargs=s3_additional_kwargs, ) df = read_orc( path=paths, path_root=path_root, dataset=True, path_suffix=filename_suffix if path_root is None else None, path_ignore_suffix=filename_ignore_suffix if path_root is None else None, columns=columns, validate_schema=validate_schema, dtype_backend=dtype_backend, use_threads=use_threads, ray_args=ray_args, boto3_session=boto3_session, s3_additional_kwargs=s3_additional_kwargs, pyarrow_additional_kwargs=pyarrow_additional_kwargs, ) return _data_types.cast_pandas_with_athena_types( df=df, dtype=_extract_partitions_dtypes_from_table_details(response=res), dtype_backend=dtype_backend, ) @_utils.validate_distributed_kwargs( unsupported_kwargs=["boto3_session"], ) @apply_configs def read_orc_metadata( path: str | list[str], dataset: bool = False, version_id: str | dict[str, str] | None = None, path_suffix: str | None = None, path_ignore_suffix: str | list[str] | None = None, ignore_empty: bool = True, ignore_null: bool = False, dtype: dict[str, str] | None = None, sampling: float = 1.0, use_threads: bool | int = True, boto3_session: boto3.Session | None = None, s3_additional_kwargs: dict[str, Any] | None = None, ) -> _ReadTableMetadataReturnValue: """Read Apache ORC file(s) metadata from an S3 prefix or list of S3 objects paths. The concept of `dataset` enables more complex features like partitioning and catalog integration (AWS Glue Catalog). This function accepts Unix shell-style wildcards in the path argument. * (matches everything), ? (matches any single character), [seq] (matches any character in seq), [!seq] (matches any character not in seq). If you want to use a path which includes Unix shell-style wildcard characters (`*, ?, []`), you can use `glob.escape(path)` before passing the argument to this function. Note ---- If `use_threads=True`, the number of threads is obtained from os.cpu_count(). Parameters ---------- path S3 prefix (accepts Unix shell-style wildcards) (e.g. s3://bucket/prefix) or list of S3 objects paths (e.g. [s3://bucket/key0, s3://bucket/key1]). dataset If `True`, read an ORC dataset instead of individual file(s), loading all related partitions as columns. version_id Version id of the object or mapping of object path to version id. (e.g. {'s3://bucket/key0': '121212', 's3://bucket/key1': '343434'}) path_suffix Suffix or List of suffixes to be read (e.g. [".gz.orc", ".snappy.orc"]). If None, reads all files. (default) path_ignore_suffix Suffix or List of suffixes to be ignored.(e.g. [".csv", "_SUCCESS"]). If None, reads all files. (default) ignore_empty Ignore files with 0 bytes. ignore_null Ignore columns with null type. dtype Dictionary of columns names and Athena/Glue types to cast. Use when you have columns with undetermined data types as partitions columns. (e.g. {'col name': 'bigint', 'col2 name': 'int'}) sampling Ratio of files metadata to inspect. Must be `0.0 < sampling <= 1.0`. The higher, the more accurate. The lower, the faster. use_threads True to enable concurrent requests, False to disable multiple threads. If enabled os.cpu_count() will be used as the max number of threads. If integer is provided, specified number is used. boto3_session Boto3 Session. The default boto3 session will be used if boto3_session receive None. s3_additional_kwargs Forward to S3 botocore requests. Returns ------- columns_types: Dictionary with keys as column names and values as data types (e.g. {'col0': 'bigint', 'col1': 'double'}). / partitions_types: Dictionary with keys as partition names and values as data types (e.g. {'col2': 'date'}). Examples -------- Reading all ORC files (with partitions) metadata under a prefix >>> import awswrangler as wr >>> columns_types, partitions_types = wr.s3.read_orc_metadata(path='s3://bucket/prefix/', dataset=True) Reading all ORC files metadata from a list >>> import awswrangler as wr >>> columns_types, partitions_types = wr.s3.read_orc_metadata(path=[ ... 's3://bucket/filename0.orc', ... 's3://bucket/filename1.orc', ... ]) """ reader = _ORCTableMetadataReader() columns_types, partitions_types, _ = reader.read_table_metadata( path=path, version_id=version_id, path_suffix=path_suffix, path_ignore_suffix=path_ignore_suffix, ignore_empty=ignore_empty, ignore_null=ignore_null, dtype=dtype, sampling=sampling, dataset=dataset, use_threads=use_threads, s3_additional_kwargs=s3_additional_kwargs, boto3_session=boto3_session, ) return _ReadTableMetadataReturnValue(columns_types, partitions_types)