def delete_objects()

in aws_codeseeder/services/s3.py [0:0]


def delete_objects(bucket: str, keys: Optional[List[str]] = None) -> None:
    """Delete objects from an S3 Bucket

    Parameters
    ----------
    bucket : str
        S3 Bucket name
    keys : Optional[List[str]], optional
        List of keys to delete, all if None, by default None
    """
    if keys is None:
        keys_pairs: List[Dict[str, str]] = list_keys(bucket=bucket)
    else:
        keys_pairs = [{"Key": k} for k in keys]
    if keys_pairs:
        chunks: List[List[Dict[str, str]]] = _chunkify(lst=keys_pairs, max_length=1_000)
        with concurrent.futures.ThreadPoolExecutor(max_workers=len(chunks)) as executor:
            list(executor.map(_delete_objects, repeat(bucket), chunks))