in backend/ecs_tasks/delete_files/main.py [0:0]
def execute(queue_url, message_body, receipt_handle):
logger.info("Message received")
queue = get_queue(queue_url)
msg = queue.Message(receipt_handle)
try:
# Parse and validate incoming message
validate_message(message_body)
body = json.loads(message_body)
session = get_session(body.get("RoleArn"))
client = session.client("s3")
kms_client = session.client("kms")
cols, object_path, job_id, file_format, manifest_object = itemgetter(
"Columns", "Object", "JobId", "Format", "Manifest"
)(body)
input_bucket, input_key = parse_s3_url(object_path)
validate_bucket_versioning(client, input_bucket)
match_ids = build_matches(cols, manifest_object)
s3 = s3fs.S3FileSystem(
session=session,
default_cache_type="none",
requester_pays=True,
default_fill_cache=False,
version_aware=True,
)
# Download the object in-memory and convert to PyArrow NativeFile
logger.info("Downloading and opening %s object in-memory", object_path)
metadata = s3.metadata(object_path, refresh=True)
with s3.open(object_path, "rb") as f:
source_version = f.version_id
logger.info("Using object version %s as source", source_version)
# Write new file in-memory
compressed = object_path.endswith(".gz")
is_encrypted = is_kms_cse_encrypted(metadata)
input_file = decrypt(f, metadata, kms_client) if is_encrypted else f
out_sink, stats = delete_matches_from_file(
input_file, match_ids, file_format, compressed
)
if stats["DeletedRows"] == 0:
raise ValueError(
"The object {} was processed successfully but no rows required deletion".format(
object_path
)
)
with pa.BufferReader(out_sink.getvalue()) as output_buf:
if is_encrypted:
output_buf, metadata = encrypt(output_buf, metadata, kms_client)
logger.info("Uploading new object version to S3")
new_version = save(
s3,
client,
output_buf,
input_bucket,
input_key,
metadata,
source_version,
)
logger.info("New object version: %s", new_version)
verify_object_versions_integrity(
client, input_bucket, input_key, source_version, new_version
)
if body.get("DeleteOldVersions"):
logger.info(
"Deleting object {} versions older than version {}".format(
input_key, new_version
)
)
delete_old_versions(client, input_bucket, input_key, new_version)
msg.delete()
emit_deletion_event(body, stats)
except (KeyError, ArrowException) as e:
err_message = "Apache Arrow processing error: {}".format(str(e))
handle_error(msg, message_body, err_message)
except IOError as e:
err_message = "Unable to retrieve object: {}".format(str(e))
handle_error(msg, message_body, err_message)
except MemoryError as e:
err_message = "Insufficient memory to work on object: {}".format(str(e))
handle_error(msg, message_body, err_message)
except ClientError as e:
ignore_error = False
err_message = "ClientError: {}".format(str(e))
if e.operation_name == "PutObjectAcl":
err_message += ". Redacted object uploaded successfully but unable to restore WRITE ACL"
if e.operation_name == "ListObjectVersions":
err_message += ". Could not verify redacted object version integrity"
if e.operation_name == "HeadObject" and e.response["Error"]["Code"] == "404":
ignore_error = body.get("IgnoreObjectNotFoundExceptions", False)
if ignore_error:
skip_reason = "Ignored error: {}".format(err_message)
handle_skip(msg, body, skip_reason)
else:
handle_error(msg, message_body, err_message)
except ValueError as e:
err_message = "Unprocessable message: {}".format(str(e))
handle_error(msg, message_body, err_message)
except DeleteOldVersionsError as e:
err_message = "Unable to delete previous versions: {}".format(str(e))
handle_error(msg, message_body, err_message)
except IntegrityCheckFailedError as e:
err_description, client, bucket, key, version_id = e.args
err_message = "Object version integrity check failed: {}".format(
err_description
)
handle_error(msg, message_body, err_message)
rollback_object_version(
client,
bucket,
key,
version_id,
on_error=lambda err: handle_error(
None, "{}", err, "ObjectRollbackFailed", False
),
)
except Exception as e:
err_message = "Unknown error during message processing: {}".format(str(e))
handle_error(msg, message_body, err_message)