awswrangler/s3/_copy.py (137 lines of code) (raw):

"""Amazon S3 Copy Module (PRIVATE).""" from __future__ import annotations import itertools import logging from typing import TYPE_CHECKING, Any, Literal import boto3 from boto3.s3.transfer import TransferConfig from awswrangler import _utils, exceptions from awswrangler._distributed import engine from awswrangler._executor import _BaseExecutor, _get_executor from awswrangler.distributed.ray import ray_get from awswrangler.s3._delete import delete_objects from awswrangler.s3._fs import get_botocore_valid_kwargs from awswrangler.s3._list import list_objects if TYPE_CHECKING: from mypy_boto3_s3 import S3Client from mypy_boto3_s3.type_defs import CopySourceTypeDef _logger: logging.Logger = logging.getLogger(__name__) @engine.dispatch_on_engine def _copy_objects( s3_client: "S3Client" | None, batch: list[tuple[str, str]], use_threads: bool | int, s3_additional_kwargs: dict[str, Any] | None, ) -> None: _logger.debug("Copying %s objects", len(batch)) s3_client = s3_client if s3_client else _utils.client(service_name="s3") for source, target in batch: source_bucket, source_key = _utils.parse_path(path=source) copy_source: CopySourceTypeDef = {"Bucket": source_bucket, "Key": source_key} target_bucket, target_key = _utils.parse_path(path=target) s3_client.copy( CopySource=copy_source, Bucket=target_bucket, Key=target_key, ExtraArgs=s3_additional_kwargs, Config=TransferConfig(num_download_attempts=10, use_threads=use_threads), # type: ignore[arg-type] ) def _copy( batches: list[list[tuple[str, str]]], use_threads: bool | int, boto3_session: boto3.Session | None, s3_additional_kwargs: dict[str, Any] | None, ) -> None: s3_client = _utils.client(service_name="s3", session=boto3_session) if s3_additional_kwargs is None: boto3_kwargs: dict[str, Any] | None = None else: boto3_kwargs = get_botocore_valid_kwargs(function_name="copy_object", s3_additional_kwargs=s3_additional_kwargs) executor: _BaseExecutor = _get_executor(use_threads=use_threads) ray_get( executor.map( _copy_objects, s3_client, batches, itertools.repeat(use_threads), itertools.repeat(boto3_kwargs), ) ) @_utils.validate_distributed_kwargs( unsupported_kwargs=["boto3_session"], ) def merge_datasets( source_path: str, target_path: str, mode: Literal["append", "overwrite", "overwrite_partitions"] = "append", ignore_empty: bool = False, use_threads: bool | int = True, boto3_session: boto3.Session | None = None, s3_additional_kwargs: dict[str, Any] | None = None, ) -> list[str]: """Merge a source dataset into a target dataset. This function accepts Unix shell-style wildcards in the source_path argument. * (matches everything), ? (matches any single character), [seq] (matches any character in seq), [!seq] (matches any character not in seq). If you want to use a path which includes Unix shell-style wildcard characters (`*, ?, []`), you can use `glob.escape(source_path)` before passing the path to this function. Note ---- If you are merging tables (S3 datasets + Glue Catalog metadata), remember that you will also need to update your partitions metadata in some cases. (e.g. wr.athena.repair_table(table='...', database='...')) Note ---- In case of `use_threads=True` the number of threads that will be spawned will be gotten from os.cpu_count(). Parameters ---------- source_path S3 Path for the source directory. target_path S3 Path for the target directory. mode ``append`` (Default), ``overwrite``, ``overwrite_partitions``. ignore_empty Ignore files with 0 bytes. use_threads True to enable concurrent requests, False to disable multiple threads. If enabled os.cpu_count() will be used as the max number of threads. If integer is provided, specified number is used. boto3_session Boto3 Session. The default boto3 session will be used if boto3_session receive None. s3_additional_kwargs Forwarded to botocore requests. e.g. s3_additional_kwargs={'ServerSideEncryption': 'aws:kms', 'SSEKMSKeyId': 'YOUR_KMS_KEY_ARN'} Returns ------- List of new objects paths. Examples -------- Merging >>> import awswrangler as wr >>> wr.s3.merge_datasets( ... source_path="s3://bucket0/dir0/", ... target_path="s3://bucket1/dir1/", ... mode="append" ... ) ["s3://bucket1/dir1/key0", "s3://bucket1/dir1/key1"] Merging with a KMS key >>> import awswrangler as wr >>> wr.s3.merge_datasets( ... source_path="s3://bucket0/dir0/", ... target_path="s3://bucket1/dir1/", ... mode="append", ... s3_additional_kwargs={ ... 'ServerSideEncryption': 'aws:kms', ... 'SSEKMSKeyId': 'YOUR_KMS_KEY_ARN' ... } ... ) ["s3://bucket1/dir1/key0", "s3://bucket1/dir1/key1"] """ source_path = source_path[:-1] if source_path[-1] == "/" else source_path target_path = target_path[:-1] if target_path[-1] == "/" else target_path paths: list[str] = list_objects(path=f"{source_path}/", ignore_empty=ignore_empty, boto3_session=boto3_session) if len(paths) < 1: return [] if mode == "overwrite": _logger.debug("Deleting to overwrite: %s/", target_path) delete_objects(path=f"{target_path}/", use_threads=use_threads, boto3_session=boto3_session) elif mode == "overwrite_partitions": paths_wo_prefix: list[str] = [x.replace(f"{source_path}/", "") for x in paths] paths_wo_filename: list[str] = [f"{x.rpartition('/')[0]}/" for x in paths_wo_prefix] partitions_paths: list[str] = list(set(paths_wo_filename)) target_partitions_paths = [f"{target_path}/{x}" for x in partitions_paths] for path in target_partitions_paths: _logger.debug("Deleting to overwrite_partitions: %s", path) delete_objects(path=path, use_threads=use_threads, boto3_session=boto3_session) elif mode != "append": raise exceptions.InvalidArgumentValue(f"{mode} is a invalid mode option.") new_objects: list[str] = copy_objects( paths=paths, source_path=source_path, target_path=target_path, use_threads=use_threads, boto3_session=boto3_session, s3_additional_kwargs=s3_additional_kwargs, ) return new_objects @_utils.validate_distributed_kwargs( unsupported_kwargs=["boto3_session"], ) def copy_objects( paths: list[str], source_path: str, target_path: str, replace_filenames: dict[str, str] | None = None, use_threads: bool | int = True, boto3_session: boto3.Session | None = None, s3_additional_kwargs: dict[str, Any] | None = None, ) -> list[str]: """Copy a list of S3 objects to another S3 directory. Note ---- In case of `use_threads=True` the number of threads that will be spawned will be gotten from `os.cpu_count()`. Parameters ---------- paths List of S3 objects paths (e.g. ``["s3://bucket/dir0/key0", "s3://bucket/dir0/key1"]``). source_path S3 Path for the source directory. target_path S3 Path for the target directory. replace_filenames e.g. ``{"old_name.csv": "new_name.csv", "old_name2.csv": "new_name2.csv"}`` use_threads True to enable concurrent requests, False to disable multiple threads. If enabled ``os.cpu_count()`` will be used as the max number of threads. If integer is provided, specified number is used. boto3_session Boto3 Session. The default boto3 session will be used if boto3_session receive None. s3_additional_kwargs Forwarded to botocore requests. e.g. ``s3_additional_kwargs={'ServerSideEncryption': 'aws:kms', 'SSEKMSKeyId': 'YOUR_KMS_KEY_ARN'}`` Returns ------- List of new objects paths. Examples -------- Copying >>> import awswrangler as wr >>> wr.s3.copy_objects( ... paths=["s3://bucket0/dir0/key0", "s3://bucket0/dir0/key1"], ... source_path="s3://bucket0/dir0/", ... target_path="s3://bucket1/dir1/" ... ) ["s3://bucket1/dir1/key0", "s3://bucket1/dir1/key1"] Copying with a KMS key >>> import awswrangler as wr >>> wr.s3.copy_objects( ... paths=["s3://bucket0/dir0/key0", "s3://bucket0/dir0/key1"], ... source_path="s3://bucket0/dir0/", ... target_path="s3://bucket1/dir1/", ... s3_additional_kwargs={ ... 'ServerSideEncryption': 'aws:kms', ... 'SSEKMSKeyId': 'YOUR_KMS_KEY_ARN' ... } ... ) ["s3://bucket1/dir1/key0", "s3://bucket1/dir1/key1"] """ if len(paths) < 1: return [] source_path = source_path[:-1] if source_path[-1] == "/" else source_path target_path = target_path[:-1] if target_path[-1] == "/" else target_path batch: list[tuple[str, str]] = [] new_objects: list[str] = [] for path in paths: path_wo_prefix: str = path.replace(f"{source_path}/", "") path_final: str = f"{target_path}/{path_wo_prefix}" if replace_filenames is not None: parts: list[str] = path_final.rsplit(sep="/", maxsplit=1) if len(parts) == 2: path_wo_filename: str = parts[0] filename: str = parts[1] if filename in replace_filenames: new_filename: str = replace_filenames[filename] _logger.debug("Replacing filename: %s -> %s", filename, new_filename) path_final = f"{path_wo_filename}/{new_filename}" new_objects.append(path_final) batch.append((path, path_final)) _logger.debug("Creating %s new objects", len(new_objects)) _copy( batches=_utils.chunkify(lst=batch, max_length=1_000), use_threads=use_threads, boto3_session=boto3_session, s3_additional_kwargs=s3_additional_kwargs, ) return new_objects