in src/annotation_consolidation_lambda.py [0:0]
def do_consolidation(payload, s3_client, labeling_job_arn, label_attribute_name, s3_ref):
"""Consolidation methods for converting gt output to uf format."""
consolidated_output = []
for data_object_response in payload:
try:
# data_object_response: annotations from workers for a document.
annotation_list: List[dict] = data_object_response["annotations"]
annotation_file_path = ""
# contains one worker per labeling job
if annotation_list:
annotation_map = json.loads(
annotation_list[0]["annotationData"]["content"]
)
if "document" in annotation_map: # Workers can return empty responses
file_name = os.path.split(data_object_response["dataObject"]["s3Uri"])[1]
annotations = get_annotations(annotation_map["document"], s3_client, file_name)
annotation_file_path = get_annotation_file_path(s3_ref, annotations.get("File", ""))
# write annotation file
write_annotations(annotations, annotation_file_path, s3_client)
# return reference to s3
response = {
"datasetObjectId": data_object_response["datasetObjectId"],
"consolidatedAnnotation": {
"content": {
label_attribute_name: {
"annotation-ref": annotation_file_path
},
label_attribute_name + "-metadata": {
"job-name": labeling_job_arn.split(":")[5],
"type": "groundtruth/pdf-ner",
"creation-date": datetime.utcnow().isoformat(),
"human-annotated": "yes",
},
}
},
}
# Append individual data object response to the list of responses.
if response is not None:
consolidated_output.append(response)
except Exception as e:
print(f'An Error occurred in do_consolidation function: {e}')
return consolidated_output