awswrangler/s3/_describe.py (94 lines of code) (raw):

"""Amazon S3 Describe Module (INTERNAL).""" from __future__ import annotations import datetime import itertools import logging from typing import TYPE_CHECKING, Any, Dict, cast import boto3 from awswrangler import _utils from awswrangler._distributed import engine from awswrangler._executor import _BaseExecutor, _get_executor from awswrangler.distributed.ray import ray_get from awswrangler.s3 import _fs from awswrangler.s3._list import _path2list if TYPE_CHECKING: from mypy_boto3_s3 import S3Client _logger: logging.Logger = logging.getLogger(__name__) @engine.dispatch_on_engine def _describe_object( s3_client: "S3Client", path: str, s3_additional_kwargs: dict[str, Any] | None, version_id: str | None = None, ) -> tuple[str, dict[str, Any]]: s3_client = s3_client if s3_client else _utils.client(service_name="s3") bucket, key = _utils.parse_path(path=path) if s3_additional_kwargs: extra_kwargs: dict[str, Any] = _fs.get_botocore_valid_kwargs( function_name="head_object", s3_additional_kwargs=s3_additional_kwargs ) else: extra_kwargs = {} if version_id: extra_kwargs["VersionId"] = version_id desc = _utils.try_it( f=s3_client.head_object, ex=s3_client.exceptions.NoSuchKey, Bucket=bucket, Key=key, **extra_kwargs ) return path, cast(Dict[str, Any], desc) @_utils.validate_distributed_kwargs( unsupported_kwargs=["boto3_session", "s3_additional_kwargs"], ) def describe_objects( path: str | list[str], version_id: str | dict[str, str] | None = None, use_threads: bool | int = True, last_modified_begin: datetime.datetime | None = None, last_modified_end: datetime.datetime | None = None, s3_additional_kwargs: dict[str, Any] | None = None, boto3_session: boto3.Session | None = None, ) -> dict[str, dict[str, Any]]: """Describe Amazon S3 objects from a received S3 prefix or list of S3 objects paths. Fetch attributes like ContentLength, DeleteMarker, last_modified, ContentType, etc The full list of attributes can be explored under the boto3 head_object documentation: https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/s3.html#S3.Client.head_object This function accepts Unix shell-style wildcards in the path argument. * (matches everything), ? (matches any single character), [seq] (matches any character in seq), [!seq] (matches any character not in seq). If you want to use a path which includes Unix shell-style wildcard characters (`*, ?, []`), you can use `glob.escape(path)` before passing the path to this function. Note ---- In case of `use_threads=True` the number of threads that will be spawned will be gotten from os.cpu_count(). Note ---- The filter by last_modified begin last_modified end is applied after list all S3 files Parameters ---------- path S3 prefix (accepts Unix shell-style wildcards) (e.g. s3://bucket/prefix) or list of S3 objects paths (e.g. [s3://bucket/key0, s3://bucket/key1]). version_id Version id of the object or mapping of object path to version id. (e.g. {'s3://bucket/key0': '121212', 's3://bucket/key1': '343434'}) use_threads True to enable concurrent requests, False to disable multiple threads. If enabled os.cpu_count() will be used as the max number of threads. If integer is provided, specified number is used. last_modified_begin Filter the s3 files by the Last modified date of the object. The filter is applied only after list all s3 files. last_modified_end Filter the s3 files by the Last modified date of the object. The filter is applied only after list all s3 files. s3_additional_kwargs Forwarded to botocore requests. e.g. s3_additional_kwargs={'RequestPayer': 'requester'} boto3_session Boto3 Session. The default boto3 session will be used if boto3_session receive None. Returns ------- Return a dictionary of objects returned from head_objects where the key is the object path. The response object can be explored here: https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/s3.html#S3.Client.head_object Examples -------- >>> import awswrangler as wr >>> descs0 = wr.s3.describe_objects(['s3://bucket/key0', 's3://bucket/key1']) # Describe both objects >>> descs1 = wr.s3.describe_objects('s3://bucket/prefix') # Describe all objects under the prefix """ s3_client = _utils.client(service_name="s3", session=boto3_session) paths = _path2list( path=path, s3_client=s3_client, last_modified_begin=last_modified_begin, last_modified_end=last_modified_end, s3_additional_kwargs=s3_additional_kwargs, ) if len(paths) < 1: return {} executor: _BaseExecutor = _get_executor(use_threads=use_threads) resp_list = ray_get( executor.map( _describe_object, s3_client, paths, itertools.repeat(s3_additional_kwargs), [version_id.get(p) if isinstance(version_id, dict) else version_id for p in paths], ) ) return dict(resp_list) @_utils.validate_distributed_kwargs( unsupported_kwargs=["boto3_session", "s3_additional_kwargs"], ) def size_objects( path: str | list[str], version_id: str | dict[str, str] | None = None, use_threads: bool | int = True, s3_additional_kwargs: dict[str, Any] | None = None, boto3_session: boto3.Session | None = None, ) -> dict[str, int | None]: """Get the size (ContentLength) in bytes of Amazon S3 objects from a received S3 prefix or list of S3 objects paths. This function accepts Unix shell-style wildcards in the path argument. * (matches everything), ? (matches any single character), [seq] (matches any character in seq), [!seq] (matches any character not in seq). If you want to use a path which includes Unix shell-style wildcard characters (`*, ?, []`), you can use `glob.escape(path)` before passing the path to this function. Note ---- In case of `use_threads=True` the number of threads that will be spawned will be gotten from os.cpu_count(). Parameters ---------- path S3 prefix (accepts Unix shell-style wildcards) (e.g. s3://bucket/prefix) or list of S3 objects paths (e.g. [s3://bucket/key0, s3://bucket/key1]). version_id Version id of the object or mapping of object path to version id. (e.g. {'s3://bucket/key0': '121212', 's3://bucket/key1': '343434'}) use_threads True to enable concurrent requests, False to disable multiple threads. If enabled os.cpu_count() will be used as the max number of threads. If integer is provided, specified number is used. s3_additional_kwargs Forwarded to botocore requests. e.g. s3_additional_kwargs={'RequestPayer': 'requester'} boto3_session Boto3 Session. The default boto3 session will be used if boto3_session receive None. Returns ------- Dictionary where the key is the object path and the value is the object size. Examples -------- >>> import awswrangler as wr >>> sizes0 = wr.s3.size_objects(['s3://bucket/key0', 's3://bucket/key1']) # Get the sizes of both objects >>> sizes1 = wr.s3.size_objects('s3://bucket/prefix') # Get the sizes of all objects under the received prefix """ desc_list = describe_objects( path=path, version_id=version_id, use_threads=use_threads, boto3_session=boto3_session, s3_additional_kwargs=s3_additional_kwargs, ) return {k: d.get("ContentLength", None) for k, d in desc_list.items()} def get_bucket_region(bucket: str, boto3_session: boto3.Session | None = None) -> str: """Get bucket region name. Parameters ---------- bucket Bucket name. boto3_session Boto3 Session. The default boto3 session will be used if boto3_session receive None. Returns ------- Region code (e.g. 'us-east-1'). Examples -------- Using the default boto3 session >>> import awswrangler as wr >>> region = wr.s3.get_bucket_region('bucket-name') Using a custom boto3 session >>> import boto3 >>> import awswrangler as wr >>> region = wr.s3.get_bucket_region('bucket-name', boto3_session=boto3.Session()) """ client_s3 = _utils.client(service_name="s3", session=boto3_session) _logger.debug("bucket: %s", bucket) region: str = client_s3.get_bucket_location(Bucket=bucket)["LocationConstraint"] region = "us-east-1" if region is None else region _logger.debug("region: %s", region) return region