in pca-server/src/pca/pca-aws-sf-process-turn-by-turn.py [0:0]
def create_turn_by_turn_segments(self, transcribe_job_filename):
"""
Creates a list of conversational turns, splitting up by speaker or if there's a noticeable pause in
conversation. Notes, this works differently for speaker-separated and channel-separated files. For speaker-
the lines are already separated by speaker, so we only worry about splitting up speaker pauses of more than 3
seconds, but for channel- we have to hunt gaps of 100ms across an entire channel, then sort segments from both
channels, then merge any together to ensure we keep to the 3-second pause; this way means that channel- files
are able to show interleaved speech where speakers are talking over one another. Once all of this is done
we inject sentiment into each segment.
"""
speechSegmentList = []
# Load in the JSON file for processing
json_filepath = Path(transcribe_job_filename)
self.asr_output = json.load(open(json_filepath.absolute(), "r", encoding="utf-8"))
is_analytics_mode = (self.api_mode == cf.API_ANALYTICS)
# Decide on our operational mode and set the overall job language
if is_analytics_mode:
# We ignore speaker/channel mode on Analytics
isChannelMode = False
isSpeakerMode = False
else:
# Channel/Speaker-mode only relevant if not using analytics
isChannelMode = self.transcribeJobInfo["Settings"]["ChannelIdentification"]
isSpeakerMode = not isChannelMode
lastSpeaker = ""
lastEndTime = 0.0
skipLeadingSpace = False
confidenceList = []
nextSpeechSegment = None
# Process a Speaker-separated non-Analytics file
if isSpeakerMode:
# A segment is a blob of pronunciation and punctuation by an individual speaker
for segment in self.asr_output["results"]["speaker_labels"]["segments"]:
# If there is content in the segment then pick out the time and speaker
if len(segment["items"]) > 0:
# Pick out our next data
nextStartTime = float(segment["start_time"])
nextEndTime = float(segment["end_time"])
nextSpeaker = self.generate_speaker_label(standard_ts_speaker=str(segment["speaker_label"]))
# If we've changed speaker, or there's a 3-second gap, create a new row
if (nextSpeaker != lastSpeaker) or ((nextStartTime - lastEndTime) >= 3.0):
nextSpeechSegment = SpeechSegment()
speechSegmentList.append(nextSpeechSegment)
nextSpeechSegment.segmentStartTime = nextStartTime
nextSpeechSegment.segmentSpeaker = nextSpeaker
skipLeadingSpace = True
confidenceList = []
nextSpeechSegment.segmentConfidence = confidenceList
nextSpeechSegment.segmentEndTime = nextEndTime
# Note the speaker and end time of this segment for the next iteration
lastSpeaker = nextSpeaker
lastEndTime = nextEndTime
# For each word in the segment...
for word in segment["items"]:
# Get the word with the highest confidence
pronunciations = list(filter(lambda x: x["type"] == "pronunciation", self.asr_output["results"]["items"]))
word_result = list(filter(lambda x: x["start_time"] == word["start_time"] and x["end_time"] == word["end_time"], pronunciations))
try:
result = sorted(word_result[-1]["alternatives"], key=lambda x: x["confidence"])[-1]
confidence = float(result["confidence"])
except:
result = word_result[-1]["alternatives"][0]
confidence = float(result["redactions"][0]["confidence"])
# Write the word, and a leading space if this isn't the start of the segment
if skipLeadingSpace:
skipLeadingSpace = False
wordToAdd = result["content"]
else:
wordToAdd = " " + result["content"]
# If the next item is punctuation, add it to the current word
try:
word_result_index = self.asr_output["results"]["items"].index(word_result[0])
next_item = self.asr_output["results"]["items"][word_result_index + 1]
if next_item["type"] == "punctuation":
wordToAdd += next_item["alternatives"][0]["content"]
except IndexError:
pass
# Add word and confidence to the segment and to our overall stats
nextSpeechSegment.segmentText += wordToAdd
confidenceList.append({"Text": wordToAdd,
"Confidence": confidence,
"StartTime": float(word["start_time"]),
"EndTime": float(word["end_time"])})
self.numWordsParsed += 1
self.cummulativeWordAccuracy += confidence
# Process a Channel-separated file
elif isChannelMode:
# A channel contains all pronunciation and punctuation from a single speaker
for channel in self.asr_output["results"]["channel_labels"]["channels"]:
# If there is content in the channel then start processing it
if len(channel["items"]) > 0:
# We have the same speaker all the way through this channel
nextSpeaker = self.generate_speaker_label(standard_ts_speaker=str(channel["channel_label"]))
for word in channel["items"]:
# Pick out our next data from a 'pronunciation'
if word["type"] == "pronunciation":
nextStartTime = float(word["start_time"])
nextEndTime = float(word["end_time"])
# If we've changed speaker, or we haven't and the
# pause is very small, then start a new text segment
if (nextSpeaker != lastSpeaker) or\
((nextSpeaker == lastSpeaker) and ((nextStartTime - lastEndTime) > 0.1)):
nextSpeechSegment = SpeechSegment()
speechSegmentList.append(nextSpeechSegment)
nextSpeechSegment.segmentStartTime = nextStartTime
nextSpeechSegment.segmentSpeaker = nextSpeaker
skipLeadingSpace = True
confidenceList = []
nextSpeechSegment.segmentConfidence = confidenceList
nextSpeechSegment.segmentEndTime = nextEndTime
# Note the speaker and end time of this segment for the next iteration
lastSpeaker = nextSpeaker
lastEndTime = nextEndTime
# Get the word with the highest confidence
pronunciations = list(filter(lambda x: x["type"] == "pronunciation", channel["items"]))
word_result = list(filter(lambda x: x["start_time"] == word["start_time"] and x["end_time"] == word["end_time"], pronunciations))
try:
result = sorted(word_result[-1]["alternatives"], key=lambda x: x["confidence"])[-1]
confidence = float(result["confidence"])
except:
result = word_result[-1]["alternatives"][0]
confidence = float(result["redactions"][0]["confidence"])
# Write the word, and a leading space if this isn't the start of the segment
if skipLeadingSpace:
skipLeadingSpace = False
wordToAdd = result["content"]
else:
wordToAdd = " " + result["content"]
# If the next item is punctuation, add it to the current word
try:
word_result_index = channel["items"].index(word_result[0])
next_item = channel["items"][word_result_index + 1]
if next_item["type"] == "punctuation":
wordToAdd += next_item["alternatives"][0]["content"]
except IndexError:
pass
# Add word and confidence to the segment and to our overall stats
nextSpeechSegment.segmentText += wordToAdd
confidenceList.append({"Text": wordToAdd,
"Confidence": confidence,
"StartTime": float(word["start_time"]),
"EndTime": float(word["end_time"])})
self.numWordsParsed += 1
self.cummulativeWordAccuracy += confidence
# Sort the segments, as they are in channel-order and not speaker-order, then
# merge together turns from the same speaker that are very close together
speechSegmentList = sorted(speechSegmentList, key=lambda segment: segment.segmentStartTime)
speechSegmentList = self.merge_speaker_segments(speechSegmentList)
# Process a Call Analytics file
elif is_analytics_mode:
# Create our speaker mapping - we need consistent output like spk_0 | spk_1
# across all Transcribe API variants to help the UI render it all the same
for channel_def in self.transcribeJobInfo["ChannelDefinitions"]:
self.analytics_channel_map[channel_def["ParticipantRole"]] = channel_def["ChannelId"]
# Lookup shortcuts
interrupts = self.asr_output["ConversationCharacteristics"]["Interruptions"]
# Each turn has already been processed by Transcribe, so the outputs are in order
for turn in self.asr_output["Transcript"]:
# Get our next speaker name
nextSpeaker = self.generate_speaker_label(analytics_ts_speaker=turn["ParticipantRole"])
# Setup the next speaker block
nextSpeechSegment = SpeechSegment()
speechSegmentList.append(nextSpeechSegment)
nextSpeechSegment.segmentStartTime = float(turn["BeginOffsetMillis"]) / 1000.0
nextSpeechSegment.segmentEndTime = float(turn["EndOffsetMillis"]) / 1000.0
nextSpeechSegment.segmentSpeaker = nextSpeaker
nextSpeechSegment.segmentText = turn["Content"]
nextSpeechSegment.segmentLoudnessScores = turn["LoudnessScores"]
confidenceList = []
nextSpeechSegment.segmentConfidence = confidenceList
skipLeadingSpace = True
# Check if this block is within an interruption block for the speaker
if turn["ParticipantRole"] in interrupts["InterruptionsByInterrupter"]:
for entry in interrupts["InterruptionsByInterrupter"][turn["ParticipantRole"]]:
if turn["BeginOffsetMillis"] == entry["BeginOffsetMillis"]:
nextSpeechSegment.segmentInterruption = True
# Process each word in this turn
for word in turn["Items"]:
# Pick out our next data from a 'pronunciation'
if word["Type"] == "pronunciation":
# Write the word, and a leading space if this isn't the start of the segment
if skipLeadingSpace:
skipLeadingSpace = False
wordToAdd = word["Content"]
else:
wordToAdd = " " + word["Content"]
# If the word is redacted then the word confidence is a bit more buried
if "Confidence" in word:
conf_score = float(word["Confidence"])
elif "Redaction" in word:
conf_score = float(word["Redaction"][0]["Confidence"])
# Add the word and confidence to this segment's list and to our overall stats
confidenceList.append({"Text": wordToAdd,
"Confidence": conf_score,
"StartTime": float(word["BeginOffsetMillis"]) / 1000.0,
"EndTime": float(word["BeginOffsetMillis"] / 1000.0)})
self.numWordsParsed += 1
self.cummulativeWordAccuracy += conf_score
else:
# Punctuation, needs to be added to the previous word
last_word = nextSpeechSegment.segmentConfidence[-1]
last_word["Text"] = last_word["Text"] + word["Content"]
# Record any issues detected
if "IssuesDetected" in turn:
for issue in turn["IssuesDetected"]:
# Grab the transcript offsets for the issue text
begin_offset = issue["CharacterOffsets"]["Begin"]
end_offset = issue["CharacterOffsets"]["End"]
next_issue = {"Text": nextSpeechSegment.segmentText[begin_offset:end_offset],
"BeginOffset": begin_offset,
"EndOffset": end_offset}
# Tag this one on to our segment list and the header list
nextSpeechSegment.segmentIssuesDetected.append(next_issue)
self.issues_detected.append(next_issue)
# Tag on the sentiment - analytics has no per-turn numbers, so max out the
# positive and negative, which effectively is 1.0 * COMPREHEND_SENTIMENT_SCALER
turn_sentiment = turn["Sentiment"]
if turn_sentiment == "POSITIVE":
nextSpeechSegment.segmentIsPositive = True
nextSpeechSegment.segmentPositive = 1.0
nextSpeechSegment.segmentSentimentScore = COMPREHEND_SENTIMENT_SCALER
elif turn_sentiment == "NEGATIVE":
nextSpeechSegment.segmentIsNegative = True
nextSpeechSegment.segmentNegative = 1.0
nextSpeechSegment.segmentSentimentScore = COMPREHEND_SENTIMENT_SCALER
# Inject sentiments into the segment list
self.extract_nlp(speechSegmentList)
# If we ended up with any matched simple entities then insert
# them, which we can now do as we now have the sentence order
if self.simpleEntityMap != {}:
self.create_simple_entity_entries(speechSegmentList)
# Now set the overall call duration if we actually had any speech
if len(speechSegmentList) > 0:
self.duration = float(speechSegmentList[-1].segmentConfidence[-1]["EndTime"])
# Return our full turn-by-turn speaker segment list with sentiment
return speechSegmentList