awswrangler/s3/_read_parquet.py (489 lines of code) (raw):
"""Amazon S3 Read PARQUET Module (PRIVATE)."""
from __future__ import annotations
import datetime
import functools
import itertools
import logging
import warnings
from typing import (
TYPE_CHECKING,
Any,
Callable,
Iterator,
)
import boto3
import pandas as pd
import pyarrow as pa
import pyarrow.dataset
import pyarrow.parquet
from packaging import version
from typing_extensions import Literal
from awswrangler import _data_types, _utils, exceptions
from awswrangler._arrow import _add_table_partitions, _table_to_df
from awswrangler._config import apply_configs
from awswrangler._distributed import engine
from awswrangler._executor import _BaseExecutor, _get_executor
from awswrangler.distributed.ray import ray_get # noqa: F401
from awswrangler.s3._fs import open_s3_object
from awswrangler.s3._list import _path2list
from awswrangler.s3._read import (
_apply_partition_filter,
_check_version_id,
_concat_union_categoricals,
_extract_partitions_dtypes_from_table_details,
_get_num_output_blocks,
_get_path_ignore_suffix,
_get_path_root,
_get_paths_for_glue_table,
_InternalReadTableMetadataReturnValue,
_TableMetadataReader,
)
from awswrangler.typing import ArrowDecryptionConfiguration, RayReadParquetSettings, _ReadTableMetadataReturnValue
if TYPE_CHECKING:
from mypy_boto3_s3 import S3Client
BATCH_READ_BLOCK_SIZE = 65_536
CHUNKED_READ_S3_BLOCK_SIZE = 10_485_760 # 10 MB (20 * 2**20)
FULL_READ_S3_BLOCK_SIZE = 20_971_520 # 20 MB (20 * 2**20)
METADATA_READ_S3_BLOCK_SIZE = 131_072 # 128 KB (128 * 2**10)
_logger: logging.Logger = logging.getLogger(__name__)
def _pyarrow_parquet_file_wrapper(
source: Any,
coerce_int96_timestamp_unit: str | None = None,
decryption_properties: pyarrow.parquet.encryption.DecryptionConfiguration | None = None,
) -> pyarrow.parquet.ParquetFile:
try:
return pyarrow.parquet.ParquetFile(
source=source,
coerce_int96_timestamp_unit=coerce_int96_timestamp_unit,
decryption_properties=decryption_properties,
)
except pyarrow.ArrowInvalid as ex:
if str(ex) == "Parquet file size is 0 bytes":
_logger.warning("Ignoring empty file...")
return None
raise
@engine.dispatch_on_engine
def _read_parquet_metadata_file(
s3_client: "S3Client" | None,
path: str,
s3_additional_kwargs: dict[str, str] | None,
use_threads: bool | int,
version_id: str | None = None,
coerce_int96_timestamp_unit: str | None = None,
decryption_properties: pyarrow.parquet.encryption.DecryptionConfiguration | None = None,
) -> pa.schema:
with open_s3_object(
path=path,
mode="rb",
version_id=version_id,
use_threads=use_threads,
s3_client=s3_client,
s3_block_size=METADATA_READ_S3_BLOCK_SIZE,
s3_additional_kwargs=s3_additional_kwargs,
) as f:
pq_file: pyarrow.parquet.ParquetFile | None = _pyarrow_parquet_file_wrapper(
source=f,
coerce_int96_timestamp_unit=coerce_int96_timestamp_unit,
decryption_properties=decryption_properties,
)
if pq_file:
return pq_file.schema.to_arrow_schema()
return None
class _ParquetTableMetadataReader(_TableMetadataReader):
def _read_metadata_file(
self,
s3_client: "S3Client" | None,
path: str,
s3_additional_kwargs: dict[str, str] | None,
use_threads: bool | int,
version_id: str | None = None,
coerce_int96_timestamp_unit: str | None = None,
) -> pa.schema:
return _read_parquet_metadata_file(
s3_client=s3_client,
path=path,
s3_additional_kwargs=s3_additional_kwargs,
use_threads=use_threads,
version_id=version_id,
coerce_int96_timestamp_unit=coerce_int96_timestamp_unit,
)
def _read_parquet_metadata(
path: str | list[str],
path_suffix: str | None,
path_ignore_suffix: str | list[str] | None,
ignore_empty: bool,
ignore_null: bool,
dtype: dict[str, str] | None,
sampling: float,
dataset: bool,
use_threads: bool | int,
boto3_session: boto3.Session | None,
s3_additional_kwargs: dict[str, str] | None,
version_id: str | dict[str, str] | None = None,
coerce_int96_timestamp_unit: str | None = None,
) -> _InternalReadTableMetadataReturnValue:
"""Handle wr.s3.read_parquet_metadata internally."""
reader = _ParquetTableMetadataReader()
return reader.read_table_metadata(
path=path,
version_id=version_id,
path_suffix=path_suffix,
path_ignore_suffix=path_ignore_suffix,
ignore_empty=ignore_empty,
ignore_null=ignore_null,
dtype=dtype,
sampling=sampling,
dataset=dataset,
use_threads=use_threads,
s3_additional_kwargs=s3_additional_kwargs,
boto3_session=boto3_session,
coerce_int96_timestamp_unit=coerce_int96_timestamp_unit,
)
def _read_parquet_file(
s3_client: "S3Client" | None,
path: str,
path_root: str | None,
columns: list[str] | None,
coerce_int96_timestamp_unit: str | None,
s3_additional_kwargs: dict[str, str] | None,
use_threads: bool | int,
version_id: str | None = None,
schema: pa.schema | None = None,
decryption_properties: pyarrow.parquet.encryption.DecryptionConfiguration | None = None,
) -> pa.Table:
s3_block_size: int = FULL_READ_S3_BLOCK_SIZE if columns else -1 # One shot for a full read or see constant
with open_s3_object(
path=path,
mode="rb",
version_id=version_id,
use_threads=use_threads,
s3_block_size=s3_block_size,
s3_additional_kwargs=s3_additional_kwargs,
s3_client=s3_client,
) as f:
if schema and version.parse(pa.__version__) >= version.parse("8.0.0"):
try:
table = pyarrow.parquet.read_table(
f,
columns=columns,
schema=schema,
use_threads=False,
use_pandas_metadata=False,
coerce_int96_timestamp_unit=coerce_int96_timestamp_unit,
decryption_properties=decryption_properties,
)
except pyarrow.ArrowInvalid as ex:
if "Parquet file size is 0 bytes" in str(ex):
raise exceptions.InvalidFile(f"Invalid Parquet file: {path}")
raise
else:
if schema:
warnings.warn(
"Your version of pyarrow does not support reading with schema. Consider an upgrade to pyarrow 8+.",
UserWarning,
)
pq_file: pyarrow.parquet.ParquetFile | None = _pyarrow_parquet_file_wrapper(
source=f,
coerce_int96_timestamp_unit=coerce_int96_timestamp_unit,
decryption_properties=decryption_properties,
)
if pq_file is None:
raise exceptions.InvalidFile(f"Invalid Parquet file: {path}")
table = pq_file.read(columns=columns, use_threads=False, use_pandas_metadata=False)
return _add_table_partitions(
table=table,
path=path,
path_root=path_root,
)
def _read_parquet_chunked(
s3_client: "S3Client" | None,
paths: list[str],
path_root: str | None,
columns: list[str] | None,
coerce_int96_timestamp_unit: str | None,
chunked: int | bool,
use_threads: bool | int,
s3_additional_kwargs: dict[str, str] | None,
arrow_kwargs: dict[str, Any],
version_ids: dict[str, str] | None = None,
decryption_properties: pyarrow.parquet.encryption.DecryptionConfiguration | None = None,
) -> Iterator[pd.DataFrame]:
next_slice: pd.DataFrame | None = None
batch_size = BATCH_READ_BLOCK_SIZE if chunked is True else chunked
for path in paths:
with open_s3_object(
path=path,
version_id=version_ids.get(path) if version_ids else None,
mode="rb",
use_threads=use_threads,
s3_client=s3_client,
s3_block_size=CHUNKED_READ_S3_BLOCK_SIZE,
s3_additional_kwargs=s3_additional_kwargs,
) as f:
pq_file: pyarrow.parquet.ParquetFile | None = _pyarrow_parquet_file_wrapper(
source=f,
coerce_int96_timestamp_unit=coerce_int96_timestamp_unit,
decryption_properties=decryption_properties,
)
if pq_file is None:
continue
metadata = pq_file.metadata
schema = metadata.schema.to_arrow_schema()
if columns:
schema = pa.schema([schema.field(column) for column in columns], schema.metadata)
use_threads_flag: bool = use_threads if isinstance(use_threads, bool) else bool(use_threads > 1)
table_kwargs = {"path": path, "path_root": path_root}
if metadata.num_rows > 0:
for chunk in pq_file.iter_batches(
batch_size=batch_size, columns=columns, use_threads=use_threads_flag, use_pandas_metadata=False
):
table = _add_table_partitions(table=pa.Table.from_batches([chunk], schema=schema), **table_kwargs)
df = _table_to_df(table=table, kwargs=arrow_kwargs)
if chunked is True:
yield df
else:
if next_slice is not None:
df = _concat_union_categoricals(dfs=[next_slice, df], ignore_index=False)
while len(df.index) >= chunked:
yield df.iloc[:chunked, :].copy()
df = df.iloc[chunked:, :]
if df.empty:
next_slice = None
else:
next_slice = df
else:
table = _add_table_partitions(table=pa.Table.from_batches([], schema=schema), **table_kwargs)
df = _table_to_df(table=table, kwargs=arrow_kwargs)
yield df
if next_slice is not None:
yield next_slice
@engine.dispatch_on_engine
def _read_parquet(
paths: list[str],
path_root: str | None,
schema: pa.schema | None,
columns: list[str] | None,
coerce_int96_timestamp_unit: str | None,
use_threads: bool | int,
override_num_blocks: int,
version_ids: dict[str, str] | None,
s3_client: "S3Client" | None,
s3_additional_kwargs: dict[str, Any] | None,
arrow_kwargs: dict[str, Any],
bulk_read: bool,
decryption_properties: pyarrow.parquet.encryption.DecryptionConfiguration | None = None,
) -> pd.DataFrame:
executor: _BaseExecutor = _get_executor(use_threads=use_threads)
tables = executor.map(
_read_parquet_file,
s3_client,
paths,
itertools.repeat(path_root),
itertools.repeat(columns),
itertools.repeat(coerce_int96_timestamp_unit),
itertools.repeat(s3_additional_kwargs),
itertools.repeat(use_threads),
[version_ids.get(p) if isinstance(version_ids, dict) else None for p in paths],
itertools.repeat(schema),
itertools.repeat(decryption_properties),
)
# When the first table is empty in a dataset, the inferred schema may not
# be compatible with the other tables, which will raise an exception when
# concatening them down the line. As a workaround, we filter out empty
# tables, unless every table is empty. In that latter case, the schemas
# will be compatible so we do nothing in that case.
should_filter_out = any(len(table) > 0 for table in tables)
if should_filter_out:
tables = [table for table in tables if len(table) > 0]
return _utils.table_refs_to_df(tables, kwargs=arrow_kwargs)
@_utils.validate_distributed_kwargs(
unsupported_kwargs=["boto3_session", "version_id", "s3_additional_kwargs", "dtype_backend"],
)
@apply_configs
def read_parquet(
path: str | list[str],
path_root: str | None = None,
dataset: bool = False,
path_suffix: str | list[str] | None = None,
path_ignore_suffix: str | list[str] | None = None,
ignore_empty: bool = True,
partition_filter: Callable[[dict[str, str]], bool] | None = None,
columns: list[str] | None = None,
validate_schema: bool = False,
coerce_int96_timestamp_unit: str | None = None,
schema: pa.Schema | None = None,
last_modified_begin: datetime.datetime | None = None,
last_modified_end: datetime.datetime | None = None,
version_id: str | dict[str, str] | None = None,
dtype_backend: Literal["numpy_nullable", "pyarrow"] = "numpy_nullable",
chunked: bool | int = False,
use_threads: bool | int = True,
ray_args: RayReadParquetSettings | None = None,
boto3_session: boto3.Session | None = None,
s3_additional_kwargs: dict[str, Any] | None = None,
pyarrow_additional_kwargs: dict[str, Any] | None = None,
decryption_configuration: ArrowDecryptionConfiguration | None = None,
) -> pd.DataFrame | Iterator[pd.DataFrame]:
"""Read Parquet file(s) from an S3 prefix or list of S3 objects paths.
The concept of `dataset` enables more complex features like partitioning
and catalog integration (AWS Glue Catalog).
This function accepts Unix shell-style wildcards in the path argument.
* (matches everything), ? (matches any single character),
[seq] (matches any character in seq), [!seq] (matches any character not in seq).
If you want to use a path which includes Unix shell-style wildcard characters (`*, ?, []`),
you can use `glob.escape(path)` before passing the argument to this function.
Note
----
``Batching`` (`chunked` argument) (Memory Friendly):
Used to return an Iterable of DataFrames instead of a regular DataFrame.
Two batching strategies are available:
- If **chunked=True**, depending on the size of the data, one or more data frames are returned per file in the path/dataset.
Unlike **chunked=INTEGER**, rows from different files are not mixed in the resulting data frames.
- If **chunked=INTEGER**, awswrangler iterates on the data by number of rows equal to the received INTEGER.
`P.S.` `chunked=True` is faster and uses less memory while `chunked=INTEGER` is more precise
in the number of rows.
Note
----
If `use_threads=True`, the number of threads is obtained from os.cpu_count().
Note
----
Filtering by `last_modified begin` and `last_modified end` is applied after listing all S3 files
Parameters
----------
path
S3 prefix (accepts Unix shell-style wildcards)
(e.g. s3://bucket/prefix) or list of S3 objects paths (e.g. [s3://bucket/key0, s3://bucket/key1]).
path_root
Root path of the dataset. If dataset=`True`, it is used as a starting point to load partition columns.
dataset
If `True`, read a parquet dataset instead of individual file(s), loading all related partitions as columns.
path_suffix
Suffix or List of suffixes to be read (e.g. [".gz.parquet", ".snappy.parquet"]).
If None, reads all files. (default)
path_ignore_suffix
Suffix or List of suffixes to be ignored.(e.g. [".csv", "_SUCCESS"]).
If None, reads all files. (default)
ignore_empty
Ignore files with 0 bytes.
partition_filter
Callback Function filters to apply on PARTITION columns (PUSH-DOWN filter).
This function must receive a single argument (Dict[str, str]) where keys are partitions
names and values are partitions values. Partitions values must be strings and the function
must return a bool, True to read the partition or False to ignore it.
Ignored if `dataset=False`.
E.g ``lambda x: True if x["year"] == "2020" and x["month"] == "1" else False``
https://aws-sdk-pandas.readthedocs.io/en/3.11.0/tutorials/023%20-%20Flexible%20Partitions%20Filter.html
columns
List of columns to read from the file(s).
validate_schema
Check that the schema is consistent across individual files.
coerce_int96_timestamp_unit
Cast timestamps that are stored in INT96 format to a particular resolution (e.g. "ms").
Setting to None is equivalent to "ns" and therefore INT96 timestamps are inferred as in nanoseconds.
schema
Schema to use whem reading the file.
last_modified_begin
Filter S3 objects by Last modified date.
Filter is only applied after listing all objects.
last_modified_end
Filter S3 objects by Last modified date.
Filter is only applied after listing all objects.
version_id
Version id of the object or mapping of object path to version id.
(e.g. {'s3://bucket/key0': '121212', 's3://bucket/key1': '343434'})
dtype_backend
Which dtype_backend to use, e.g. whether a DataFrame should have NumPy arrays,
nullable dtypes are used for all dtypes that have a nullable implementation when
“numpy_nullable” is set, pyarrow is used for all dtypes if “pyarrow” is set.
The dtype_backends are still experimential. The "pyarrow" backend is only supported with Pandas 2.0 or above.
chunked
If passed, the data is split into an iterable of DataFrames (Memory friendly).
If `True` an iterable of DataFrames is returned without guarantee of chunksize.
If an `INTEGER` is passed, an iterable of DataFrames is returned with maximum rows
equal to the received INTEGER.
use_threads
True to enable concurrent requests, False to disable multiple threads.
If enabled, os.cpu_count() is used as the max number of threads.
If integer is provided, specified number is used.
ray_args
Parameters of the Ray Modin settings. Only used when distributed computing is used with Ray and Modin installed.
boto3_session
Boto3 Session. The default boto3 session is used if None is received.
s3_additional_kwargs
Forward to S3 botocore requests.
pyarrow_additional_kwargs
Forwarded to `to_pandas` method converting from PyArrow tables to Pandas DataFrame.
Valid values include "split_blocks", "self_destruct", "ignore_metadata".
e.g. pyarrow_additional_kwargs={'split_blocks': True}.
decryption_configuration
``pyarrow.parquet.encryption.CryptoFactory`` and ``pyarrow.parquet.encryption.KmsConnectionConfig`` objects dict
used to create a PyArrow ``CryptoFactory.file_decryption_properties`` object to forward to PyArrow reader.
see: https://arrow.apache.org/docs/python/parquet.html#decryption-configuration
Client Decryption is not supported in distributed mode.
Returns
-------
Pandas DataFrame or a Generator in case of `chunked=True`.
Examples
--------
Reading all Parquet files under a prefix
>>> import awswrangler as wr
>>> df = wr.s3.read_parquet(path='s3://bucket/prefix/')
Reading all Parquet files from a list
>>> import awswrangler as wr
>>> df = wr.s3.read_parquet(path=['s3://bucket/filename0.parquet', 's3://bucket/filename1.parquet'])
Reading in chunks (Chunk by file)
>>> import awswrangler as wr
>>> dfs = wr.s3.read_parquet(path=['s3://bucket/filename0.parquet', 's3://bucket/filename1.parquet'], chunked=True)
>>> for df in dfs:
>>> print(df) # Smaller Pandas DataFrame
Reading in chunks (Chunk by 1MM rows)
>>> import awswrangler as wr
>>> dfs = wr.s3.read_parquet(
... path=['s3://bucket/filename0.parquet', 's3://bucket/filename1.parquet'],
... chunked=1_000_000
... )
>>> for df in dfs:
>>> print(df) # 1MM Pandas DataFrame
Reading Parquet Dataset with PUSH-DOWN filter over partitions
>>> import awswrangler as wr
>>> my_filter = lambda x: True if x["city"].startswith("new") else False
>>> df = wr.s3.read_parquet(path, dataset=True, partition_filter=my_filter)
"""
ray_args = ray_args if ray_args else {}
bulk_read = ray_args.get("bulk_read", False)
if bulk_read and validate_schema:
exceptions.InvalidArgumentCombination("Cannot validate schema when bulk reading data files.")
s3_client = _utils.client(service_name="s3", session=boto3_session)
paths: list[str] = _path2list(
path=path,
s3_client=s3_client,
suffix=path_suffix,
ignore_suffix=_get_path_ignore_suffix(path_ignore_suffix=path_ignore_suffix),
last_modified_begin=last_modified_begin,
last_modified_end=last_modified_end,
ignore_empty=ignore_empty,
s3_additional_kwargs=s3_additional_kwargs,
)
if not path_root:
path_root = _get_path_root(path=path, dataset=dataset)
if path_root and partition_filter:
paths = _apply_partition_filter(path_root=path_root, paths=paths, filter_func=partition_filter)
if len(paths) < 1:
raise exceptions.NoFilesFound(f"No files Found on: {path}.")
version_ids = _check_version_id(paths=paths, version_id=version_id)
# Create PyArrow schema based on file metadata, columns filter, and partitions
if validate_schema and not bulk_read:
metadata_reader = _ParquetTableMetadataReader()
schema = metadata_reader.validate_schemas(
paths=paths,
path_root=path_root,
columns=columns,
validate_schema=validate_schema,
s3_client=s3_client,
version_ids=version_ids,
use_threads=use_threads,
s3_additional_kwargs=s3_additional_kwargs,
coerce_int96_timestamp_unit=coerce_int96_timestamp_unit,
)
decryption_properties = (
decryption_configuration["crypto_factory"].file_decryption_properties(
decryption_configuration["kms_connection_config"]
)
if decryption_configuration
else None
)
arrow_kwargs = _data_types.pyarrow2pandas_defaults(
use_threads=use_threads, kwargs=pyarrow_additional_kwargs, dtype_backend=dtype_backend
)
if chunked:
return _read_parquet_chunked(
s3_client=s3_client,
paths=paths,
path_root=path_root,
columns=columns,
coerce_int96_timestamp_unit=coerce_int96_timestamp_unit,
chunked=chunked,
use_threads=use_threads,
s3_additional_kwargs=s3_additional_kwargs,
arrow_kwargs=arrow_kwargs,
version_ids=version_ids,
decryption_properties=decryption_properties,
)
return _read_parquet(
paths,
path_root=path_root,
schema=schema,
columns=columns,
coerce_int96_timestamp_unit=coerce_int96_timestamp_unit,
use_threads=use_threads,
override_num_blocks=_get_num_output_blocks(ray_args),
s3_client=s3_client,
s3_additional_kwargs=s3_additional_kwargs,
arrow_kwargs=arrow_kwargs,
version_ids=version_ids,
bulk_read=bulk_read,
decryption_properties=decryption_properties,
)
@_utils.validate_distributed_kwargs(
unsupported_kwargs=["boto3_session", "s3_additional_kwargs", "dtype_backend"],
)
@apply_configs
def read_parquet_table(
table: str,
database: str,
filename_suffix: str | list[str] | None = None,
filename_ignore_suffix: str | list[str] | None = None,
catalog_id: str | None = None,
partition_filter: Callable[[dict[str, str]], bool] | None = None,
columns: list[str] | None = None,
validate_schema: bool = True,
coerce_int96_timestamp_unit: str | None = None,
dtype_backend: Literal["numpy_nullable", "pyarrow"] = "numpy_nullable",
chunked: bool | int = False,
use_threads: bool | int = True,
ray_args: RayReadParquetSettings | None = None,
boto3_session: boto3.Session | None = None,
s3_additional_kwargs: dict[str, Any] | None = None,
pyarrow_additional_kwargs: dict[str, Any] | None = None,
decryption_configuration: ArrowDecryptionConfiguration | None = None,
) -> pd.DataFrame | Iterator[pd.DataFrame]:
"""Read Apache Parquet table registered in the AWS Glue Catalog.
Note
----
``Batching`` (`chunked` argument) (Memory Friendly):
Used to return an Iterable of DataFrames instead of a regular DataFrame.
Two batching strategies are available:
- If **chunked=True**, depending on the size of the data, one or more data frames are returned per file in the path/dataset.
Unlike **chunked=INTEGER**, rows from different files will not be mixed in the resulting data frames.
- If **chunked=INTEGER**, awswrangler will iterate on the data by number of rows equal the received INTEGER.
`P.S.` `chunked=True` is faster and uses less memory while `chunked=INTEGER` is more precise
in the number of rows.
Note
----
If `use_threads=True`, the number of threads is obtained from os.cpu_count().
Parameters
----------
table
AWS Glue Catalog table name.
database
AWS Glue Catalog database name.
filename_suffix
Suffix or List of suffixes to be read (e.g. [".gz.parquet", ".snappy.parquet"]).
If None, read all files. (default)
filename_ignore_suffix
Suffix or List of suffixes for S3 keys to be ignored.(e.g. [".csv", "_SUCCESS"]).
If None, read all files. (default)
catalog_id
The ID of the Data Catalog from which to retrieve Databases.
If none is provided, the AWS account ID is used by default.
partition_filter
Callback Function filters to apply on PARTITION columns (PUSH-DOWN filter).
This function must receive a single argument (Dict[str, str]) where keys are partitions
names and values are partitions values. Partitions values must be strings and the function
must return a bool, True to read the partition or False to ignore it.
Ignored if `dataset=False`.
E.g ``lambda x: True if x["year"] == "2020" and x["month"] == "1" else False``
https://aws-sdk-pandas.readthedocs.io/en/3.11.0/tutorials/023%20-%20Flexible%20Partitions%20Filter.html
columns
List of columns to read from the file(s).
validate_schema
Check that the schema is consistent across individual files.
coerce_int96_timestamp_unit
Cast timestamps that are stored in INT96 format to a particular resolution (e.g. "ms").
Setting to None is equivalent to "ns" and therefore INT96 timestamps are inferred as in nanoseconds.
dtype_backend
Which dtype_backend to use, e.g. whether a DataFrame should have NumPy arrays,
nullable dtypes are used for all dtypes that have a nullable implementation when
“numpy_nullable” is set, pyarrow is used for all dtypes if “pyarrow” is set.
The dtype_backends are still experimential. The "pyarrow" backend is only supported with Pandas 2.0 or above.
chunked
If passed, the data is split into an iterable of DataFrames (Memory friendly).
If `True` an iterable of DataFrames is returned without guarantee of chunksize.
If an `INTEGER` is passed, an iterable of DataFrames is returned with maximum rows
equal to the received INTEGER.
use_threads
True to enable concurrent requests, False to disable multiple threads.
If enabled, os.cpu_count() is used as the max number of threads.
If integer is provided, specified number is used.
ray_args
Parameters of the Ray Modin settings. Only used when distributed computing is used with Ray and Modin installed.
boto3_session
Boto3 Session. The default boto3 session is used if None is received.
s3_additional_kwargs
Forward to S3 botocore requests.
pyarrow_additional_kwargs
Forwarded to `to_pandas` method converting from PyArrow tables to Pandas DataFrame.
Valid values include "split_blocks", "self_destruct", "ignore_metadata".
e.g. pyarrow_additional_kwargs={'split_blocks': True}.
decryption_configuration
``pyarrow.parquet.encryption.CryptoFactory`` and ``pyarrow.parquet.encryption.KmsConnectionConfig`` objects dict
used to create a PyArrow ``CryptoFactory.file_decryption_properties`` object to forward to PyArrow reader.
Client Decryption is not supported in distributed mode.
Returns
-------
Pandas DataFrame or a Generator in case of `chunked=True`.
Examples
--------
Reading Parquet Table
>>> import awswrangler as wr
>>> df = wr.s3.read_parquet_table(database='...', table='...')
Reading Parquet Table in chunks (Chunk by file)
>>> import awswrangler as wr
>>> dfs = wr.s3.read_parquet_table(database='...', table='...', chunked=True)
>>> for df in dfs:
>>> print(df) # Smaller Pandas DataFrame
Reading Parquet Dataset with PUSH-DOWN filter over partitions
>>> import awswrangler as wr
>>> my_filter = lambda x: True if x["city"].startswith("new") else False
>>> df = wr.s3.read_parquet_table(path, dataset=True, partition_filter=my_filter)
"""
paths: str | list[str]
path_root: str | None
paths, path_root, res = _get_paths_for_glue_table(
table=table,
database=database,
filename_suffix=filename_suffix,
filename_ignore_suffix=filename_ignore_suffix,
catalog_id=catalog_id,
partition_filter=partition_filter,
boto3_session=boto3_session,
s3_additional_kwargs=s3_additional_kwargs,
)
df = read_parquet(
path=paths,
path_root=path_root,
dataset=True,
path_suffix=filename_suffix if path_root is None else None,
path_ignore_suffix=filename_ignore_suffix if path_root is None else None,
columns=columns,
validate_schema=validate_schema,
coerce_int96_timestamp_unit=coerce_int96_timestamp_unit,
dtype_backend=dtype_backend,
chunked=chunked,
use_threads=use_threads,
ray_args=ray_args,
boto3_session=boto3_session,
s3_additional_kwargs=s3_additional_kwargs,
pyarrow_additional_kwargs=pyarrow_additional_kwargs,
decryption_configuration=decryption_configuration,
)
partial_cast_function = functools.partial(
_data_types.cast_pandas_with_athena_types,
dtype=_extract_partitions_dtypes_from_table_details(response=res),
dtype_backend=dtype_backend,
)
if _utils.is_pandas_frame(df):
return partial_cast_function(df)
# df is a generator, so map is needed for casting dtypes
return map(partial_cast_function, df)
@apply_configs
@_utils.validate_distributed_kwargs(
unsupported_kwargs=["boto3_session"],
)
def read_parquet_metadata(
path: str | list[str],
dataset: bool = False,
version_id: str | dict[str, str] | None = None,
path_suffix: str | None = None,
path_ignore_suffix: str | list[str] | None = None,
ignore_empty: bool = True,
ignore_null: bool = False,
dtype: dict[str, str] | None = None,
sampling: float = 1.0,
coerce_int96_timestamp_unit: str | None = None,
use_threads: bool | int = True,
boto3_session: boto3.Session | None = None,
s3_additional_kwargs: dict[str, Any] | None = None,
) -> _ReadTableMetadataReturnValue:
"""Read Apache Parquet file(s) metadata from an S3 prefix or list of S3 objects paths.
The concept of `dataset` enables more complex features like partitioning
and catalog integration (AWS Glue Catalog).
This function accepts Unix shell-style wildcards in the path argument.
* (matches everything), ? (matches any single character),
[seq] (matches any character in seq), [!seq] (matches any character not in seq).
If you want to use a path which includes Unix shell-style wildcard characters (`*, ?, []`),
you can use `glob.escape(path)` before passing the argument to this function.
Note
----
If `use_threads=True`, the number of threads is obtained from os.cpu_count().
Parameters
----------
path
S3 prefix (accepts Unix shell-style wildcards)
(e.g. s3://bucket/prefix) or list of S3 objects paths (e.g. [s3://bucket/key0, s3://bucket/key1]).
dataset
If `True`, read a parquet dataset instead of individual file(s), loading all related partitions as columns.
version_id
Version id of the object or mapping of object path to version id.
(e.g. {'s3://bucket/key0': '121212', 's3://bucket/key1': '343434'})
path_suffix
Suffix or List of suffixes to be read (e.g. [".gz.parquet", ".snappy.parquet"]).
If None, reads all files. (default)
path_ignore_suffix
Suffix or List of suffixes to be ignored.(e.g. [".csv", "_SUCCESS"]).
If None, reads all files. (default)
ignore_empty
Ignore files with 0 bytes.
ignore_null
Ignore columns with null type.
dtype
Dictionary of columns names and Athena/Glue types to cast.
Use when you have columns with undetermined data types as partitions columns.
(e.g. {'col name': 'bigint', 'col2 name': 'int'})
sampling
Ratio of files metadata to inspect.
Must be `0.0 < sampling <= 1.0`.
The higher, the more accurate.
The lower, the faster.
use_threads
True to enable concurrent requests, False to disable multiple threads.
If enabled os.cpu_count() will be used as the max number of threads.
If integer is provided, specified number is used.
boto3_session
Boto3 Session. The default boto3 session will be used if boto3_session receive None.
s3_additional_kwargs
Forward to S3 botocore requests.
Returns
-------
columns_types: Dictionary with keys as column names and values as
data types (e.g. {'col0': 'bigint', 'col1': 'double'}). /
partitions_types: Dictionary with keys as partition names
and values as data types (e.g. {'col2': 'date'}).
Examples
--------
Reading all Parquet files (with partitions) metadata under a prefix
>>> import awswrangler as wr
>>> columns_types, partitions_types = wr.s3.read_parquet_metadata(path='s3://bucket/prefix/', dataset=True)
Reading all Parquet files metadata from a list
>>> import awswrangler as wr
>>> columns_types, partitions_types = wr.s3.read_parquet_metadata(path=[
... 's3://bucket/filename0.parquet',
... 's3://bucket/filename1.parquet'
... ])
"""
columns_types, partitions_types, _ = _read_parquet_metadata(
path=path,
version_id=version_id,
path_suffix=path_suffix,
path_ignore_suffix=path_ignore_suffix,
ignore_empty=ignore_empty,
ignore_null=ignore_null,
dtype=dtype,
sampling=sampling,
dataset=dataset,
use_threads=use_threads,
s3_additional_kwargs=s3_additional_kwargs,
boto3_session=boto3_session,
coerce_int96_timestamp_unit=coerce_int96_timestamp_unit,
)
return _ReadTableMetadataReturnValue(columns_types, partitions_types)