in pca-server/src/pca/pca-aws-sf-process-turn-by-turn.py [0:0]
def parse_transcribe_file(self, sf_event):
"""
Parses the output from the specified Transcribe job
"""
# Load in the Amazon Transcribe job header information, ensuring that the job has completed
transcribe = boto3.client("transcribe")
job_name = sf_event["jobName"]
try:
job_status = self.load_transcribe_job_info(sf_event)
assert job_status == "COMPLETED", f"Transcription job '{job_name}' has not yet completed."
except transcribe.exceptions.BadRequestException:
assert False, f"Unable to load information for Transcribe job named '{job_name}'."
# Create an MP3 playback file if we have to, using the redacted audio file if needed
if ("RedactedMediaFileUri" in self.transcribeJobInfo["Media"]) and cf.isAudioRedactionEnabled():
# Copy the redacted audio into the playback folder
# TODO - Once the UI Lambda that plays the audio is changed to NOT assume that the redacted
# TODO - audio is in the input bucket we can just set the playback URI to the audio location
redacted_url = "s3://" + "/".join(self.transcribeJobInfo["Media"]["RedactedMediaFileUri"].split("/")[3:])
s3_object = urlparse(redacted_url)
s3_client = boto3.resource("s3")
source = {"Bucket": s3_object.netloc, "Key": s3_object.path[1:]}
dest_key = cf.appConfig[cf.CONF_PREFIX_MP3_PLAYBACK] + '/' + redacted_url.split('/')[-1]
s3_client.meta.client.copy(source, cf.appConfig[cf.CONF_S3BUCKET_INPUT], dest_key)
self.audioPlaybackUri = "s3://" + cf.appConfig[cf.CONF_S3BUCKET_INPUT] + "/" + dest_key
else:
# Just sort out the input file
self.create_playback_mp3_audio(self.transcribeJobInfo["Media"]["MediaFileUri"])
# Pick out the config parameters that we need
outputS3Bucket = cf.appConfig[cf.CONF_S3BUCKET_OUTPUT]
outputS3Key = cf.appConfig[cf.CONF_PREFIX_PARSED_RESULTS]
# Parse Call GUID and Agent Name/ID from filename if possible
self.set_guid(job_name)
self.set_agent(job_name)
# Work out the conversation time and set the language code
self.calculate_transcribe_conversation_time(job_name)
self.set_comprehend_language_code(self.transcribeJobInfo["LanguageCode"])
# Download the job JSON results file to a local temp file - different Transcribe modes put
# the files in different folder structures, so just strip everything past the bucket name
self.jsonOutputFilename = self.transcript_uri.split("/")[-1]
json_filepath = TMP_DIR + '/' + self.jsonOutputFilename
transcriptResultsKey = "/".join(self.transcript_uri.split("/")[4:])
# Now download - this has been known to get a "404 HeadObject Not Found",
# which makes no sense, so if that happens then re-try in a sec. Only once.
s3Client = boto3.client('s3')
try:
s3Client.download_file(outputS3Bucket, transcriptResultsKey, json_filepath)
except:
time.sleep(3)
s3Client.download_file(outputS3Bucket, transcriptResultsKey, json_filepath)
# Before we process, let's load up any required simply entity map
self.load_simple_entity_string_map()
# Now create turn-by-turn diarisation, with associated sentiments and entities
self.speechSegmentList = self.create_turn_by_turn_segments(json_filepath)
# generate JSON results
output = self.create_json_results()
# Write out the JSON data to our S3 location
s3Resource = boto3.resource('s3')
s3Object = s3Resource.Object(outputS3Bucket, outputS3Key + '/' + self.jsonOutputFilename)
s3Object.put(
Body=(bytes(json.dumps(output).encode('UTF-8')))
)
# Index transcript in Kendra, if transcript search is enabled
kendraIndexId = cf.appConfig[cf.CONF_KENDRA_INDEX_ID]
if (kendraIndexId != "None"):
analysisUri = f"{cf.appConfig[cf.CONF_WEB_URI]}dashboard/parsedFiles/{self.jsonOutputFilename}"
transcript_with_markers = prepare_transcript(json_filepath)
conversationAnalytics = output["ConversationAnalytics"]
put_kendra_document(kendraIndexId, analysisUri, conversationAnalytics, transcript_with_markers)
# delete the local file
pcacommon.remove_temp_file(json_filepath)
# Return our filename for re-use later
return self.jsonOutputFilename