def lambda_handler()

in source/operators/comprehend/key_phrases/get_key_phrases.py [0:0]


def lambda_handler(event, context):
    print("We got this event:\n", event)
    operator_object = MediaInsightsOperationHelper(event)
    try:
        job_id = operator_object.metadata["comprehend_phrases_job_id"]
        asset_id = operator_object.asset_id
        workflow_id = operator_object.workflow_execution_id
        # If Comprehend wasn't run due to empty text input, then we're done
        if job_id == "Empty input --> empty output.":
            operator_object.update_workflow_status("Complete")
            return operator_object.return_output_object()
    except KeyError:
        operator_object.update_workflow_status("Error")
        operator_object.add_workflow_metadata(comprehend_error="No valid job id")
        raise MasExecutionError(operator_object.return_output_object())
    try:
        response = comprehend.list_key_phrases_detection_jobs(
            Filter={
                'JobName': job_id,
            },
        )
    except Exception as e:
        operator_object.update_workflow_status("Error")
        operator_object.add_workflow_metadata(comprehend_error="Unable to get response from comprehend: {e}".format(e=e))
        raise MasExecutionError(operator_object.return_output_object())
    else:
        print(response)
        comprehend_status = response["KeyPhrasesDetectionJobPropertiesList"][0]["JobStatus"]
        if comprehend_status == "SUBMITTED" or comprehend_status == "IN_PROGRESS":
            operator_object.add_workflow_metadata(comprehend_phrases_job_id=job_id)
            operator_object.update_workflow_status("Executing")
            return operator_object.return_output_object()
        elif comprehend_status == "COMPLETED":
            output_uri = response["KeyPhrasesDetectionJobPropertiesList"][0]["OutputDataConfig"]["S3Uri"]
            delimeter = '/'
            bucket = delimeter.join(output_uri.split(delimeter)[2:3])
            file_name = output_uri.split(delimeter)[-1]
            key = delimeter.join(output_uri.split(delimeter)[3:-1]) + '/' + file_name
            comprehend_tarball = read_from_s3(bucket, key)
            comprehend_data = {"LanguageCode": response['KeyPhrasesDetectionJobPropertiesList'][0]['LanguageCode'], "Results": []}
            if comprehend_tarball["Status"] == "Success":
                input_bytes = comprehend_tarball["Object"]
                with tarfile.open(fileobj=BytesIO(input_bytes)) as tf:
                    for member in tf:
                        if member.isfile():
                            comprehend_data["Results"].append(tf.extractfile(member).read().decode('utf-8'))
                dataplane = DataPlane()
                metadata_upload = dataplane.store_asset_metadata(asset_id, "key_phrases", workflow_id, comprehend_data)
                if "Status" not in metadata_upload:
                    operator_object.update_workflow_status("Error")
                    operator_object.add_workflow_metadata(
                        comprehend_error="Unable to store key phrases data {e}".format(e=metadata_upload))
                    raise MasExecutionError(operator_object.return_output_object())
                else:
                    if metadata_upload["Status"] == "Success":
                        operator_object.add_workflow_metadata(comprehend_entity_job_id=job_id, output_uri=output_uri)
                        operator_object.update_workflow_status("Complete")
                        return operator_object.return_output_object()
                    else:
                        operator_object.update_workflow_status("Error")
                        operator_object.add_workflow_metadata(comprehend_error="Unable to store key phrases data {e}".format(e=metadata_upload))
                        raise MasExecutionError(operator_object.return_output_object())
            else:
                operator_object.update_workflow_status("Error")
                operator_object.add_workflow_metadata(comprehend_entity_job_id=job_id, comprehend_error="could not retrieve output from s3: {e}".format(e=comprehend_tarball["Message"]))
                raise MasExecutionError(operator_object.return_output_object())
        else:
            operator_object.update_workflow_status("Error")
            operator_object.add_workflow_metadata(comprehend_phrases_job_id=job_id, comprehend_error="comprehend returned as failed: {e}".format(e=response["KeyPhrasesDetectionJobPropertiesList"][0]["Message"]))
            raise MasExecutionError(operator_object.return_output_object())