def execute()

in backend/ecs_tasks/delete_files/main.py [0:0]


def execute(queue_url, message_body, receipt_handle):
    logger.info("Message received")
    queue = get_queue(queue_url)
    msg = queue.Message(receipt_handle)
    try:
        # Parse and validate incoming message
        validate_message(message_body)
        body = json.loads(message_body)
        session = get_session(body.get("RoleArn"))
        client = session.client("s3")
        kms_client = session.client("kms")
        cols, object_path, job_id, file_format, manifest_object = itemgetter(
            "Columns", "Object", "JobId", "Format", "Manifest"
        )(body)
        input_bucket, input_key = parse_s3_url(object_path)
        validate_bucket_versioning(client, input_bucket)
        match_ids = build_matches(cols, manifest_object)
        s3 = s3fs.S3FileSystem(
            session=session,
            default_cache_type="none",
            requester_pays=True,
            default_fill_cache=False,
            version_aware=True,
        )
        # Download the object in-memory and convert to PyArrow NativeFile
        logger.info("Downloading and opening %s object in-memory", object_path)
        metadata = s3.metadata(object_path, refresh=True)
        with s3.open(object_path, "rb") as f:
            source_version = f.version_id
            logger.info("Using object version %s as source", source_version)
            # Write new file in-memory
            compressed = object_path.endswith(".gz")
            is_encrypted = is_kms_cse_encrypted(metadata)
            input_file = decrypt(f, metadata, kms_client) if is_encrypted else f
            out_sink, stats = delete_matches_from_file(
                input_file, match_ids, file_format, compressed
            )
        if stats["DeletedRows"] == 0:
            raise ValueError(
                "The object {} was processed successfully but no rows required deletion".format(
                    object_path
                )
            )
        with pa.BufferReader(out_sink.getvalue()) as output_buf:
            if is_encrypted:
                output_buf, metadata = encrypt(output_buf, metadata, kms_client)
            logger.info("Uploading new object version to S3")
            new_version = save(
                s3,
                client,
                output_buf,
                input_bucket,
                input_key,
                metadata,
                source_version,
            )
        logger.info("New object version: %s", new_version)
        verify_object_versions_integrity(
            client, input_bucket, input_key, source_version, new_version
        )
        if body.get("DeleteOldVersions"):
            logger.info(
                "Deleting object {} versions older than version {}".format(
                    input_key, new_version
                )
            )
            delete_old_versions(client, input_bucket, input_key, new_version)
        msg.delete()
        emit_deletion_event(body, stats)
    except (KeyError, ArrowException) as e:
        err_message = "Apache Arrow processing error: {}".format(str(e))
        handle_error(msg, message_body, err_message)
    except IOError as e:
        err_message = "Unable to retrieve object: {}".format(str(e))
        handle_error(msg, message_body, err_message)
    except MemoryError as e:
        err_message = "Insufficient memory to work on object: {}".format(str(e))
        handle_error(msg, message_body, err_message)
    except ClientError as e:
        ignore_error = False
        err_message = "ClientError: {}".format(str(e))
        if e.operation_name == "PutObjectAcl":
            err_message += ". Redacted object uploaded successfully but unable to restore WRITE ACL"
        if e.operation_name == "ListObjectVersions":
            err_message += ". Could not verify redacted object version integrity"
        if e.operation_name == "HeadObject" and e.response["Error"]["Code"] == "404":
            ignore_error = body.get("IgnoreObjectNotFoundExceptions", False)
        if ignore_error:
            skip_reason = "Ignored error: {}".format(err_message)
            handle_skip(msg, body, skip_reason)
        else:
            handle_error(msg, message_body, err_message)
    except ValueError as e:
        err_message = "Unprocessable message: {}".format(str(e))
        handle_error(msg, message_body, err_message)
    except DeleteOldVersionsError as e:
        err_message = "Unable to delete previous versions: {}".format(str(e))
        handle_error(msg, message_body, err_message)
    except IntegrityCheckFailedError as e:
        err_description, client, bucket, key, version_id = e.args
        err_message = "Object version integrity check failed: {}".format(
            err_description
        )
        handle_error(msg, message_body, err_message)
        rollback_object_version(
            client,
            bucket,
            key,
            version_id,
            on_error=lambda err: handle_error(
                None, "{}", err, "ObjectRollbackFailed", False
            ),
        )
    except Exception as e:
        err_message = "Unknown error during message processing: {}".format(str(e))
        handle_error(msg, message_body, err_message)