in CloudFormation/converter/app.py [0:0]
def lambda_handler(event, context):
s3_event = event["Records"][0]["s3"]
input_file = f"s3://{s3_event['bucket']['name']}/{s3_event['object']['key']}"
gt_manifest_folder, gt_manifest_fname = input_file.rsplit("/", 1)
data_file = gt_manifest_folder + "/comprehend/documents/" + gt_manifest_fname[:-8] + "txt"
ann_file = gt_manifest_folder + "/comprehend/annotations/" + gt_manifest_fname[:-8] + "csv"
print("input_file, data_file, ann_file =", (input_file, data_file, ann_file))
# Add tags to output.manifest to track conversion execution.
add_tags(
bucket=s3_event["bucket"]["name"],
obj=s3_event["object"]["key"],
tags={
"lambda_req_id": context.aws_request_id,
"lambda_log_group": context.log_group_name,
"lambda_log_stream": context.log_stream_name.translate(trs),
},
s3_client=s3_client,
)
# Start conversions.
with fs.open(input_file, "r") as f_gt, fs.open(data_file, "w") as f_data, fs.open(ann_file, "w") as f_ann:
datawriter = csv.writer(f_data)
annwriter = csv.writer(f_ann)
annwriter.writerow(["File", "Line", "Begin Offset", "End Offset", "Type"])
ann_file_column = PurePath(data_file).name
# Process each line in Ground Truth's output manifest.
for index, jsonLine in enumerate(f_gt):
source = GroundTruth2Comprehend.convert_to_dataset(jsonLine)
datawriter.writerow([source])
annotations = GroundTruth2Comprehend.convert_to_annotations(index, jsonLine, ann_file_column)
for entry in annotations:
annwriter.writerow(entry)
return {
"files": {"input_file": input_file, "data_file": data_file, "ann_file": ann_file},
"lambda": {
"lambda_req_id": context.aws_request_id,
"lambda_log_group": context.log_group_name,
"lambda_log_stream_raw": context.log_stream_name,
"lambda_log_stream_trs": context.log_stream_name.translate(trs),
},
"metadata": {m.__name__: m.__version__ for m in (s3fs, boto3)},
}