def lambda_handler()

in source/operators/rekognition/check_person_tracking_status.py [0:0]


def lambda_handler(event, context):
    print("We got the following event:\n", event)
    try:
        status = event["Status"]
        asset_id = event['MetaData']['AssetId']
    except KeyError as e:
        output_object.update_workflow_status("Error")
        output_object.add_workflow_metadata(PersonTrackingError="Missing key {e}".format(e=e))
        raise MasExecutionError(output_object.return_output_object())
    # Images will have already been processed, so return if job status is already set.
    if status == "Complete":
        output_object.update_workflow_status("Complete")
        return output_object.return_output_object()
    try:
        job_id = event["MetaData"]["JobId"]
        workflow_id = event["MetaData"]["WorkflowExecutionId"]
    except KeyError as e:
        output_object.update_workflow_status("Error")
        output_object.add_workflow_metadata(PersonTrackingError="Missing a required metadata key {e}".format(e=e))
        raise MasExecutionError(output_object.return_output_object())
    # Check rekognition job status:
    dataplane = DataPlane()
    pagination_token = ''
    is_paginated = False
    # If pagination token is in event["MetaData"] then use that to start
    # reading reko results from where this Lambda's previous invocation left off.
    if ("PageToken" in event["MetaData"]):
        pagination_token = event["MetaData"]["PageToken"]
        is_paginated = True
    # Read and persist 10 reko pages per invocation of this Lambda
    for page_number in range(11):
        # Get reko results
        print("job id: " + job_id + " page token: " + pagination_token)
        try:
            response = rek.get_person_tracking(JobId=job_id, NextToken=pagination_token)
        except rek.exceptions.InvalidPaginationTokenException as e:
            # Trying to reverse seek to the last valid pagination token would be difficult
            # to implement, so in the rare case that a pagination token expires we'll
            # just start over by reading from the first page.
            print(e)
            print("WARNING: Invalid pagination token found. Restarting read from first page.")
            pagination_token = ''
            continue
        # If the reko job is IN_PROGRESS then return. We'll check again after a step function wait.
        if response['JobStatus'] == "IN_PROGRESS":
            output_object.update_workflow_status("Executing")
            output_object.add_workflow_metadata(JobId=job_id, AssetId=asset_id, WorkflowExecutionId=workflow_id)
            return output_object.return_output_object()
        # If the reko job is FAILED then mark the workflow status as Error and return.
        elif response['JobStatus'] == "FAILED":
            output_object.update_workflow_status("Error")
            output_object.add_workflow_metadata(JobId=job_id, PersonTrackingError=str(response["StatusMessage"]))
            raise MasExecutionError(output_object.return_output_object())
        # If the reko job is SUCCEEDED then save this current reko page result
        # and continue to next page_number.
        elif response['JobStatus'] == "SUCCEEDED":
            # If reko results contain more pages then save this page and continue to the next page
            if 'NextToken' in response:
                is_paginated = True
                # Persist rekognition results (current page)
                metadata_upload = dataplane.store_asset_metadata(asset_id=asset_id, operator_name=operator_name, workflow_id=workflow_id, results=response, paginate=True, end=False)
                # If dataplane request succeeded then get the next pagination token and continue.
                if "Status" in metadata_upload and metadata_upload["Status"] == "Success":
                    # Log that this page has been successfully uploaded to the dataplane
                    print("Uploaded metadata for asset: {asset}, job {JobId}, page {page}".format(asset=asset_id, JobId=job_id, page=pagination_token))
                    # Get the next pagination token:
                    pagination_token = response['NextToken']
                    # In order to avoid Lambda timeouts, we're only going to persist 10 pages then
                    # pass the pagination token to the workflow metadata and let our step function
                    # invoker restart this Lambda. The pagination token allows this Lambda
                    # continue from where it left off.
                    if page_number == 10:
                        output_object.update_workflow_status("Executing")
                        output_object.add_workflow_metadata(PageToken=pagination_token, JobId=job_id, AssetId=asset_id, WorkflowExecutionId=workflow_id)
                        return output_object.return_output_object()
                # If dataplane request failed then mark workflow as failed
                else:
                    output_object.update_workflow_status("Error")
                    output_object.add_workflow_metadata(PersonTrackingError="Unable to upload metadata for asset: {asset}".format(asset=asset_id), JobId=job_id)
                    raise MasExecutionError(output_object.return_output_object())
            # If reko results contain no more pages then save this page and mark the stage complete
            else:
                # If we've been saving pages, then tell dataplane this is the last page
                if is_paginated:
                    metadata_upload = dataplane.store_asset_metadata(asset_id=asset_id, operator_name=operator_name, workflow_id=workflow_id, results=response, paginate=True, end=True)
                # If there is only one page then save to dataplane without dataplane options
                else:
                    metadata_upload = dataplane.store_asset_metadata(asset_id=asset_id, operator_name=operator_name, workflow_id=workflow_id, results=response)
                # If dataplane request succeeded then mark the stage complete
                if "Status" in metadata_upload and metadata_upload["Status"] == "Success":
                    print("Uploaded metadata for asset: {asset}".format(asset=asset_id))
                    output_object.add_workflow_metadata(JobId=job_id)
                    output_object.update_workflow_status("Complete")
                    return output_object.return_output_object()
                # If dataplane request failed then mark workflow as failed
                else:
                    output_object.update_workflow_status("Error")
                    output_object.add_workflow_metadata(PersonTrackingError="Unable to upload metadata for {asset}: {error}".format(asset=asset_id, error=metadata_upload))
                    output_object.add_workflow_metadata(JobId=job_id)
                    raise MasExecutionError(output_object.return_output_object())
        # If reko job failed then mark workflow as failed
        else:
            output_object.update_workflow_status("Error")
            output_object.add_workflow_metadata(PersonTrackingError="Unable to determine status")
            raise MasExecutionError(output_object.return_output_object())