in backend/ecs_tasks/delete_files/s3.py [0:0]
def delete_old_versions(client, input_bucket, input_key, new_version):
try:
resp = list(
paginate(
client,
client.list_object_versions,
["Versions", "DeleteMarkers"],
Bucket=input_bucket,
Prefix=input_key,
VersionIdMarker=new_version,
KeyMarker=input_key,
)
)
versions = [el[0] for el in resp if el[0] is not None]
delete_markers = [el[1] for el in resp if el[1] is not None]
versions.extend(delete_markers)
sorted_versions = sorted(versions, key=lambda x: x["LastModified"])
version_ids = [v["VersionId"] for v in sorted_versions]
errors = []
max_deletions = 1000
for i in range(0, len(version_ids), max_deletions):
resp = client.delete_objects(
Bucket=input_bucket,
Delete={
"Objects": [
{"Key": input_key, "VersionId": version_id}
for version_id in version_ids[i : i + max_deletions]
],
"Quiet": True,
},
)
errors.extend(resp.get("Errors", []))
if len(errors) > 0:
raise DeleteOldVersionsError(
errors=[
"Delete object {} version {} failed: {}".format(
e["Key"], e["VersionId"], e["Message"]
)
for e in errors
]
)
except ClientError as e:
raise DeleteOldVersionsError(errors=[str(e)])