in pca-server/src/pca/pca-transcribe-eventbridge.py [0:0]
def lambda_handler(event, context):
# Our tracking table name is an environment variable
DDB_TRACKING_TABLE = os.environ["TableName"]
# Mapping of event type to Transcribe API type, which defines the
# Transcribe call method and tags to use when looking up the jobs status
transcribe = boto3.client("transcribe")
TRANSCRIBE_API_MAP = {
"Transcribe Job State Change": {
"mode": cf.API_STANDARD,
"eb_job_name": "TranscriptionJobName",
"get_job_method": transcribe.get_transcription_job,
"api_key": "TranscriptionJobName",
"job_tag": "TranscriptionJob",
"status_tag": "TranscriptionJobStatus"
},
"Call Analytics Job State Change": {
"mode": cf.API_ANALYTICS,
"eb_job_name": "JobName",
"get_job_method": transcribe.get_call_analytics_job,
"api_key": "CallAnalyticsJobName",
"job_tag": "CallAnalyticsJob",
"status_tag": "CallAnalyticsJobStatus"
}
}
# Work out what Transcribe API mode this is - if it's
# an event type that we don't support then quietly exit
if TRANSCRIBE_API_MAP.get(event["detail-type"], False):
api_map = TRANSCRIBE_API_MAP[event["detail-type"]]
api_mode = api_map["mode"]
# Lookup our job metadata and results, then find out DDB matching entry
try:
# Lookup the job status
job_name = event["detail"][api_map["eb_job_name"]]
kwargs = {api_map["api_key"]: job_name}
response = api_map["get_job_method"](**{k: v for k, v in kwargs.items()})[api_map["job_tag"]]
job_status = response[api_map["status_tag"]]
# Read tracking entry between Transcribe job and its Step Function
ddbClient = boto3.client("dynamodb")
tracking = ddbClient.get_item(Key={'PKJobId': {'S': job_name}, 'SKApiMode': {'S': api_mode}},
TableName=DDB_TRACKING_TABLE)
# It's unlikely, but if we didn't get a value due to some race condition
# meaning that the job finishes before the token was written then wait
# for 5 seconds and try again. Just once. This may never happen
if "Item" not in tracking:
# Just sleep for a few seconds and try again
time.sleep(5)
tracking = ddbClient.get_item(Key={'PKJobId': {'S': job_name}, 'SKApiMode': {'S': api_mode}},
TableName=DDB_TRACKING_TABLE)
except:
# If the job status lookup failed (unlikely) or DDB threw an exception,
# then just say we didn't find a matching record and carry on
tracking = {}
# Did we have a result?
if "Item" in tracking:
# Delete entry in DDB table - there's no way we'll be processing this again
ddbClient.delete_item(Key={'PKJobId': {'S': job_name}, 'SKApiMode': {'S': api_mode}},
TableName=DDB_TRACKING_TABLE)
# Extract the Step Functions task and previous event status
taskToken = tracking["Item"]["taskToken"]['S']
eventStatus = json.loads(tracking["Item"]["taskState"]['S'])
# If the job has FAILED then we need to check if it's a service failure,
# as this can happen, then we want to re-try the job another time
finalResponse = job_status
if job_status == "FAILED":
errorMesg = response["FailureReason"]
if errorMesg.startswith("Internal"):
# Internal failure - we want to retry a few times, but only once
retryCount = eventStatus.pop("retryCount", 0)
# Not retried enough yet - let's try another time
if (retryCount < RETRY_LIMIT):
eventStatus["retryCount"] = retryCount + 1
finalResponse = "RETRY"
# All complete - continue our workflow with this status/retry count
eventStatus["transcribeStatus"] = finalResponse
sfnClient = boto3.client("stepfunctions")
sfnClient.send_task_success(taskToken=taskToken,
output=json.dumps(eventStatus))
# We're always positive... we'll get Transcribe events outside of PCA,
# so if we didn't find our tracking entry then it shouldn't be an issue
return {
'statusCode': 200,
'body': json.dumps('Success.')
}