in source/operators/comprehend/entities/get_entity_detection.py [0:0]
def lambda_handler(event, context):
print("We got this event:\n", event)
operator_object = MediaInsightsOperationHelper(event)
try:
job_id = operator_object.metadata["comprehend_entity_job_id"]
asset_id = operator_object.asset_id
workflow_id = operator_object.workflow_execution_id
# If Comprehend wasn't run due to empty text input, then we're done
if job_id == "Empty input --> empty output.":
operator_object.update_workflow_status("Complete")
return operator_object.return_output_object()
except KeyError:
operator_object.update_workflow_status("Error")
operator_object.add_workflow_metadata(comprehend_error="No valid job id")
raise MasExecutionError(operator_object.return_output_object())
try:
response = comprehend.list_entities_detection_jobs(
Filter={
'JobName': job_id,
},
)
except Exception as e:
operator_object.update_workflow_status("Error")
operator_object.add_workflow_metadata(comprehend_error="Unable to get response from comprehend: {e}".format(e=str(e)))
raise MasExecutionError(operator_object.return_output_object())
else:
print(response)
comprehend_status = response["EntitiesDetectionJobPropertiesList"][0]["JobStatus"]
if comprehend_status == "SUBMITTED" or comprehend_status == "IN_PROGRESS":
operator_object.add_workflow_metadata(comprehend_entity_job_id=job_id)
operator_object.update_workflow_status("Executing")
return operator_object.return_output_object()
elif comprehend_status == "COMPLETED":
output_uri = response["EntitiesDetectionJobPropertiesList"][0]["OutputDataConfig"]["S3Uri"]
delimeter = '/'
bucket = delimeter.join(output_uri.split(delimeter)[2:3])
file_name = output_uri.split(delimeter)[-1]
key = delimeter.join(output_uri.split(delimeter)[3:-1]) + '/' + file_name
comprehend_tarball = read_from_s3(bucket, key)
comprehend_data = {"LanguageCode": response['EntitiesDetectionJobPropertiesList'][0]['LanguageCode'], "Results": []}
if comprehend_tarball["Status"] == "Success":
input_bytes = comprehend_tarball["Object"]
with tarfile.open(fileobj=BytesIO(input_bytes)) as tf:
for member in tf:
if member.isfile():
comprehend_data["Results"].append(tf.extractfile(member).read().decode('utf-8'))
dataplane = DataPlane()
metadata_upload = dataplane.store_asset_metadata(asset_id, "entities", workflow_id, comprehend_data)
if "Status" not in metadata_upload:
operator_object.update_workflow_status("Error")
operator_object.add_workflow_metadata(
comprehend_error="Unable to store entity data {e}".format(e=metadata_upload))
raise MasExecutionError(operator_object.return_output_object())
else:
if metadata_upload["Status"] == "Success":
operator_object.add_workflow_metadata(comprehend_entity_job_id=job_id, output_uri=output_uri)
operator_object.update_workflow_status("Complete")
return operator_object.return_output_object()
else:
operator_object.update_workflow_status("Error")
operator_object.add_workflow_metadata(comprehend_error="Unable to store entity data {e}".format(e=metadata_upload))
raise MasExecutionError(operator_object.return_output_object())
else:
operator_object.update_workflow_status("Error")
operator_object.add_workflow_metadata(comprehend_entity_job_id=job_id, comprehend_error="could not retrieve output from s3: {e}".format(e=comprehend_tarball["Message"]))
raise MasExecutionError(operator_object.return_output_object())
else:
operator_object.update_workflow_status("Error")
operator_object.add_workflow_metadata(comprehend_entity_job_id=job_id, comprehend_error="comprehend returned as failed: {e}".format(e=response["EntitiesDetectionJobPropertiesList"][0]["Message"]))
raise MasExecutionError(operator_object.return_output_object())