def delete_old_versions()

in backend/ecs_tasks/delete_files/s3.py [0:0]


def delete_old_versions(client, input_bucket, input_key, new_version):
    try:
        resp = list(
            paginate(
                client,
                client.list_object_versions,
                ["Versions", "DeleteMarkers"],
                Bucket=input_bucket,
                Prefix=input_key,
                VersionIdMarker=new_version,
                KeyMarker=input_key,
            )
        )
        versions = [el[0] for el in resp if el[0] is not None]
        delete_markers = [el[1] for el in resp if el[1] is not None]
        versions.extend(delete_markers)
        sorted_versions = sorted(versions, key=lambda x: x["LastModified"])
        version_ids = [v["VersionId"] for v in sorted_versions]
        errors = []
        max_deletions = 1000
        for i in range(0, len(version_ids), max_deletions):
            resp = client.delete_objects(
                Bucket=input_bucket,
                Delete={
                    "Objects": [
                        {"Key": input_key, "VersionId": version_id}
                        for version_id in version_ids[i : i + max_deletions]
                    ],
                    "Quiet": True,
                },
            )
            errors.extend(resp.get("Errors", []))
        if len(errors) > 0:
            raise DeleteOldVersionsError(
                errors=[
                    "Delete object {} version {} failed: {}".format(
                        e["Key"], e["VersionId"], e["Message"]
                    )
                    for e in errors
                ]
            )
    except ClientError as e:
        raise DeleteOldVersionsError(errors=[str(e)])