in pca-server/src/pca/pca-aws-sf-start-transcribe-job.py [0:0]
def submitTranscribeJob(bucket, key, lang_code):
'''
Submits the supplied audio file to Transcribe, using the suppplied language code. The method will decide
whether to call the standard Transcribe APIs or the Call Analytics APIs
@param bucket: Bucket holding the audio file to be tested
@param key: Key for the audio file in the bucket
@param lang_code: Configured language code for the audio file
@return: Name of the transcription job, and the Transcript API mode
'''
# Work out our API mode for Transcribe, and get our boto3 client
transcribe = boto3.client('transcribe')
api_mode, channel_ident = evaluate_transcribe_mode(bucket, key)
# Generate job-name - delete if it already exists
job_name = cf.generateJobName(key)
current_job_status = check_existing_job_status(job_name, transcribe, api_mode)
uri = 's3://' + bucket + '/' + key
# If there's a job already running then the input file may have been copied - quit
if (current_job_status == "IN_PROGRESS") or (current_job_status == "QUEUED"):
# Return empty job name
print("A Transcription job named \'{}\' is already in progress - cannot continue.".format(job_name))
return ""
elif current_job_status != "":
# But if an old one exists we can delete it
delete_existing_job(job_name, transcribe, api_mode)
# Setup the structures common to both Standard and Call Analytics
job_settings = {}
media_settings = {
'MediaFileUri': uri
}
# Double check that if we have a custom vocab that it actually exists
if cf.appConfig[cf.CONF_VOCABNAME] != "":
try:
vocab_name = cf.appConfig[cf.CONF_VOCABNAME] + '-' + lang_code.lower()
our_vocab = transcribe.get_vocabulary(VocabularyName = vocab_name)
if our_vocab["VocabularyState"] == "READY":
# Only use it if it is ready for use
job_settings["VocabularyName"] = vocab_name
except:
# Doesn't exist - don't use it
pass
# Double check that if we have a defined vocabulary filter that it exists
try:
vocab_filter_name = cf.appConfig[cf.CONF_FILTER_NAME] + '-' + lang_code.lower()
transcribe.get_vocabulary_filter(VocabularyFilterName = vocab_filter_name)
job_settings["VocabularyFilterMethod"] = cf.appConfig[cf.CONF_FILTER_MODE]
job_settings["VocabularyFilterName"] = vocab_filter_name
except:
# Doesn't exist - don't use it
pass
# Get our role ARN from the environment and enable content redaction (if possible, and if wanted)
role_arn = os.environ["RoleArn"]
if cf.isTranscriptRedactionEnabled() and (lang_code in cf.appConfig[cf.CONF_REDACTION_LANGS]):
content_redaction = {'RedactionType': 'PII', 'RedactionOutput': 'redacted_and_unredacted'}
else:
content_redaction = None
# Now sort out the mode-specific parameters
if api_mode == cf.API_ANALYTICS:
# CALL ANALYTICS JOB MODE - start with redaction and language
if content_redaction is not None:
job_settings["ContentRedaction"] = content_redaction
job_settings["LanguageOptions"] = [lang_code]
# Work out where our AGENT channel is - this will default to AGENT=0 if it can't work it out
conf_channels = [speaker_name.lower() for speaker_name in cf.appConfig[cf.CONF_SPEAKER_NAMES]]
if "agent" in conf_channels:
# Pick out the index, but if > 1 then we need to default to 0
agent_channel_number = conf_channels.index("agent")
if agent_channel_number > 1:
agent_channel_number = 0
else:
# No agent name defined - default to channel-0
agent_channel_number = 0
# Now build up or channel definitions
chan_def_agent = {'ChannelId': agent_channel_number, 'ParticipantRole': 'AGENT'}
chan_def_cust = {'ChannelId': agent_channel_number ^ 1, 'ParticipantRole': 'CUSTOMER'}
# Should have a clear run at doing the job now
kwargs = {'CallAnalyticsJobName': job_name,
'Media': media_settings,
'OutputLocation': f"s3://{cf.appConfig[cf.CONF_S3BUCKET_OUTPUT]}/{cf.appConfig[cf.CONF_PREFIX_TRANSCRIBE_RESULTS]}/",
'DataAccessRoleArn': role_arn,
'Settings': job_settings,
'ChannelDefinitions': [chan_def_agent, chan_def_cust]
}
# Start the Transcribe job
response = transcribe.start_call_analytics_job(
**{k: v for k, v in kwargs.items() if v is not None}
)
else:
# STANDARD TRANSCRIBE JOB MODE - start with some simple flags
job_settings['ShowSpeakerLabels'] = not channel_ident
job_settings['ChannelIdentification'] = channel_ident
# Some settings are valid dependent on the mode
if not channel_ident:
job_settings["MaxSpeakerLabels"] = int(cf.appConfig[cf.CONF_MAX_SPEAKERS])
# Job execution settings tp allow queueing of standard Transcribe jobs
execution_settings = {
"AllowDeferredExecution": True,
"DataAccessRoleArn": role_arn
}
# Should have a clear run at doing the job now
kwargs = {'TranscriptionJobName': job_name,
'LanguageCode': lang_code,
'Media': media_settings,
'OutputBucketName': cf.appConfig[cf.CONF_S3BUCKET_OUTPUT],
'OutputKey': cf.appConfig[cf.CONF_PREFIX_TRANSCRIBE_RESULTS] + '/',
'Settings': job_settings,
'JobExecutionSettings': execution_settings,
'ContentRedaction': content_redaction
}
# Start the Transcribe job, removing any 'None' values on the way
response = transcribe.start_transcription_job(
**{k: v for k, v in kwargs.items() if v is not None}
)
# Return our job name and api mode, as we need to track them
return job_name, api_mode