def lambda_handler()

in pca-server/src/pca/pca-transcribe-eventbridge.py [0:0]


def lambda_handler(event, context):
    # Our tracking table name is an environment variable
    DDB_TRACKING_TABLE = os.environ["TableName"]

    # Mapping of event type to Transcribe API type, which defines the
    # Transcribe call method and tags to use when looking up the jobs status
    transcribe = boto3.client("transcribe")
    TRANSCRIBE_API_MAP = {
        "Transcribe Job State Change": {
            "mode": cf.API_STANDARD,
            "eb_job_name": "TranscriptionJobName",
            "get_job_method": transcribe.get_transcription_job,
            "api_key": "TranscriptionJobName",
            "job_tag": "TranscriptionJob",
            "status_tag": "TranscriptionJobStatus"
        },
        "Call Analytics Job State Change": {
            "mode": cf.API_ANALYTICS,
            "eb_job_name": "JobName",
            "get_job_method": transcribe.get_call_analytics_job,
            "api_key": "CallAnalyticsJobName",
            "job_tag": "CallAnalyticsJob",
            "status_tag": "CallAnalyticsJobStatus"
        }
    }

    # Work out what Transcribe API mode this is - if it's
    # an event type that we don't support then quietly exit
    if TRANSCRIBE_API_MAP.get(event["detail-type"], False):
        api_map = TRANSCRIBE_API_MAP[event["detail-type"]]
        api_mode = api_map["mode"]

        # Lookup our job metadata and results, then find out DDB matching entry
        try:
            # Lookup the job status
            job_name = event["detail"][api_map["eb_job_name"]]
            kwargs = {api_map["api_key"]: job_name}
            response = api_map["get_job_method"](**{k: v for k, v in kwargs.items()})[api_map["job_tag"]]
            job_status = response[api_map["status_tag"]]

            # Read tracking entry between Transcribe job and its Step Function
            ddbClient = boto3.client("dynamodb")
            tracking = ddbClient.get_item(Key={'PKJobId': {'S': job_name}, 'SKApiMode': {'S': api_mode}},
                                          TableName=DDB_TRACKING_TABLE)

            # It's unlikely, but if we didn't get a value due to some race condition
            # meaning that the job finishes before the token was written then wait
            # for 5 seconds and try again.  Just once.  This may never happen
            if "Item" not in tracking:
                # Just sleep for a few seconds and try again
                time.sleep(5)
                tracking = ddbClient.get_item(Key={'PKJobId': {'S': job_name}, 'SKApiMode': {'S': api_mode}},
                                              TableName=DDB_TRACKING_TABLE)
        except:
            # If the job status lookup failed (unlikely) or DDB threw an exception,
            # then just say we didn't find a matching record and carry on
            tracking = {}

        # Did we have a result?
        if "Item" in tracking:
            # Delete entry in DDB table - there's no way we'll be processing this again
            ddbClient.delete_item(Key={'PKJobId': {'S': job_name}, 'SKApiMode': {'S': api_mode}},
                                  TableName=DDB_TRACKING_TABLE)

            # Extract the Step Functions task and previous event status
            taskToken = tracking["Item"]["taskToken"]['S']
            eventStatus = json.loads(tracking["Item"]["taskState"]['S'])

            # If the job has FAILED then we need to check if it's a service failure,
            # as this can happen, then we want to re-try the job another time
            finalResponse = job_status
            if job_status == "FAILED":
                errorMesg = response["FailureReason"]
                if errorMesg.startswith("Internal"):
                    # Internal failure - we want to retry a few times, but only once
                    retryCount = eventStatus.pop("retryCount", 0)

                    # Not retried enough yet - let's try another time
                    if (retryCount < RETRY_LIMIT):
                        eventStatus["retryCount"] = retryCount + 1
                        finalResponse = "RETRY"

            # All complete - continue our workflow with this status/retry count
            eventStatus["transcribeStatus"] = finalResponse
            sfnClient = boto3.client("stepfunctions")
            sfnClient.send_task_success(taskToken=taskToken,
                                        output=json.dumps(eventStatus))

    # We're always positive... we'll get Transcribe events outside of PCA,
    # so if we didn't find our tracking entry then it shouldn't be an issue
    return {
        'statusCode': 200,
        'body': json.dumps('Success.')
    }