in python/ts-to-word.py [0:0]
def create_turn_by_turn_segments(data, cli_args):
"""
This creates a list of per-turn speech segments based upon the transcript data. It has to work in three
slightly different ways, as each operational mode from Transcribe outputs slightly different JSON structures.
These modes are (a) Speaker-separated audio, (b) Channel-separated audio, and (c) Call Analytics audio
:param data: JSON result data from Transcribe
:param cli_args: CLI arguments used for this processing run
:return: List of transcription speech segments
"""
speechSegmentList = []
# Decide on our operational mode - it's in the job-status or, if necessary, infer it from the data file
# STANDARD => speaker separated, channel separated; ANALYTICS => different format
isAnalyticsMode = cli_args.analyticsMode
if isAnalyticsMode:
# We know if its analytics mode, as it's defined in the job-status and file
isChannelMode = False
isSpeakerMode = False
else:
# Channel/Speaker-mode only relevant if not using analytics
isChannelMode = "channel_labels" in data["results"]
isSpeakerMode = not isChannelMode
lastSpeaker = ""
lastEndTime = 0.0
skipLeadingSpace = False
confidenceList = []
nextSpeechSegment = None
# Process a Speaker-separated non-analytics file
if isSpeakerMode:
# A segment is a blob of pronunciation and punctuation by an individual speaker
for segment in data["results"]["speaker_labels"]["segments"]:
# If there is content in the segment then pick out the time and speaker
if len(segment["items"]) > 0:
# Pick out our next data
nextStartTime = float(segment["start_time"])
nextEndTime = float(segment["end_time"])
nextSpeaker = str(segment["speaker_label"])
# If we've changed speaker, or there's a gap, create a new row
if (nextSpeaker != lastSpeaker) or ((nextStartTime - lastEndTime) >= START_NEW_SEGMENT_DELAY):
nextSpeechSegment = SpeechSegment()
speechSegmentList.append(nextSpeechSegment)
nextSpeechSegment.segmentStartTime = nextStartTime
nextSpeechSegment.segmentSpeaker = nextSpeaker
skipLeadingSpace = True
confidenceList = []
nextSpeechSegment.segmentConfidence = confidenceList
nextSpeechSegment.segmentEndTime = nextEndTime
# Note the speaker and end time of this segment for the next iteration
lastSpeaker = nextSpeaker
lastEndTime = nextEndTime
# For each word in the segment...
for word in segment["items"]:
# Get the word with the highest confidence
pronunciations = list(filter(lambda x: x["type"] == "pronunciation", data["results"]["items"]))
word_result = list(filter(lambda x: x["start_time"] == word["start_time"] and x["end_time"] == word["end_time"], pronunciations))
try:
result = sorted(word_result[-1]["alternatives"], key=lambda x: x["confidence"])[-1]
confidence = float(result["confidence"])
except:
result = word_result[-1]["alternatives"][0]
confidence = float(result["redactions"][0]["confidence"])
# Write the word, and a leading space if this isn't the start of the segment
if skipLeadingSpace:
skipLeadingSpace = False
wordToAdd = result["content"]
else:
wordToAdd = " " + result["content"]
# If the next item is punctuation, add it to the current word
try:
word_result_index = data["results"]["items"].index(word_result[0])
next_item = data["results"]["items"][word_result_index + 1]
if next_item["type"] == "punctuation":
wordToAdd += next_item["alternatives"][0]["content"]
except IndexError:
pass
nextSpeechSegment.segmentText += wordToAdd
confidenceList.append({"text": wordToAdd,
"confidence": confidence,
"start_time": float(word["start_time"]),
"end_time": float(word["end_time"])})
# Process a Channel-separated non-analytics file
elif isChannelMode:
# A channel contains all pronunciation and punctuation from a single speaker
for channel in data["results"]["channel_labels"]["channels"]:
# If there is content in the channel then start processing it
if len(channel["items"]) > 0:
# We have the same speaker all the way through this channel
nextSpeaker = str(channel["channel_label"])
for word in channel["items"]:
# Pick out our next data from a 'pronunciation'
if word["type"] == "pronunciation":
nextStartTime = float(word["start_time"])
nextEndTime = float(word["end_time"])
# If we've changed speaker, or we haven't and the
# pause is very small, then start a new text segment
if (nextSpeaker != lastSpeaker) or\
((nextSpeaker == lastSpeaker) and ((nextStartTime - lastEndTime) > 0.1)):
nextSpeechSegment = SpeechSegment()
speechSegmentList.append(nextSpeechSegment)
nextSpeechSegment.segmentStartTime = nextStartTime
nextSpeechSegment.segmentSpeaker = nextSpeaker
skipLeadingSpace = True
confidenceList = []
nextSpeechSegment.segmentConfidence = confidenceList
nextSpeechSegment.segmentEndTime = nextEndTime
# Note the speaker and end time of this segment for the next iteration
lastSpeaker = nextSpeaker
lastEndTime = nextEndTime
# Get the word with the highest confidence
pronunciations = list(filter(lambda x: x["type"] == "pronunciation", channel["items"]))
word_result = list(filter(lambda x: x["start_time"] == word["start_time"] and x["end_time"] == word["end_time"], pronunciations))
try:
result = sorted(word_result[-1]["alternatives"], key=lambda x: x["confidence"])[-1]
confidence = float(result["confidence"])
except:
result = word_result[-1]["alternatives"][0]
confidence = float(result["redactions"][0]["confidence"])
# result = sorted(word_result[-1]["alternatives"], key=lambda x: x["confidence"])[-1]
# Write the word, and a leading space if this isn't the start of the segment
if (skipLeadingSpace):
skipLeadingSpace = False
wordToAdd = result["content"]
else:
wordToAdd = " " + result["content"]
# If the next item is punctuation, add it to the current word
try:
word_result_index = channel["items"].index(word_result[0])
next_item = channel["items"][word_result_index + 1]
if next_item["type"] == "punctuation":
wordToAdd += next_item["alternatives"][0]["content"]
except IndexError:
pass
# Finally, add the word and confidence to this segment's list
nextSpeechSegment.segmentText += wordToAdd
confidenceList.append({"text": wordToAdd,
"confidence": confidence,
"start_time": float(word["start_time"]),
"end_time": float(word["end_time"])})
# Sort the segments, as they are in channel-order and not speaker-order, then
# merge together turns from the same speaker that are very close together
speechSegmentList = sorted(speechSegmentList, key=lambda segment: segment.segmentStartTime)
speechSegmentList = merge_speaker_segments(speechSegmentList)
# Process a Call Analytics file
elif isAnalyticsMode:
# Lookup shortcuts
interrupts = data["ConversationCharacteristics"]["Interruptions"]
# Each turn has already been processed by Transcribe, so the outputs are in order
for turn in data["Transcript"]:
# Setup the next speaker block
nextSpeechSegment = SpeechSegment()
speechSegmentList.append(nextSpeechSegment)
nextSpeechSegment.segmentStartTime = float(turn["BeginOffsetMillis"]) / 1000.0
nextSpeechSegment.segmentEndTime = float(turn["EndOffsetMillis"]) / 1000.0
nextSpeechSegment.segmentSpeaker = turn["ParticipantRole"].title()
nextSpeechSegment.segmentText = turn["Content"]
nextSpeechSegment.segmentLoudnessScores = turn["LoudnessScores"]
confidenceList = []
nextSpeechSegment.segmentConfidence = confidenceList
skipLeadingSpace = True
# Check if this block is within an interruption block for the speaker
if turn["ParticipantRole"] in interrupts["InterruptionsByInterrupter"]:
for entry in interrupts["InterruptionsByInterrupter"][turn["ParticipantRole"]]:
if turn["BeginOffsetMillis"] == entry["BeginOffsetMillis"]:
nextSpeechSegment.segmentInterruption = True
# Record any issues detected
if "IssuesDetected" in turn:
for issue in turn["IssuesDetected"]:
# Grab the transcript offsets for the issue text
nextSpeechSegment.segmentIssuesDetected.append(issue["CharacterOffsets"])
# Process each word in this turn
for word in turn["Items"]:
# Pick out our next data from a 'pronunciation'
if word["Type"] == "pronunciation":
# Write the word, and a leading space if this isn't the start of the segment
if skipLeadingSpace:
skipLeadingSpace = False
wordToAdd = word["Content"]
else:
wordToAdd = " " + word["Content"]
# If the word is redacted then the word confidence is a bit more buried
if "Confidence" in word:
conf_score = float(word["Confidence"])
elif "Redaction" in word:
conf_score = float(word["Redaction"][0]["Confidence"])
# Add the word and confidence to this segment's list
confidenceList.append({"text": wordToAdd,
"confidence": conf_score,
"start_time": float(word["BeginOffsetMillis"]) / 1000.0,
"end_time": float(word["BeginOffsetMillis"] / 1000.0)})
else:
# Punctuation, needs to be added to the previous word
last_word = nextSpeechSegment.segmentConfidence[-1]
last_word["text"] = last_word["text"] + word["Content"]
# Tag on the sentiment - analytics has no per-turn numbers
turn_sentiment = turn["Sentiment"]
if turn_sentiment == "POSITIVE":
nextSpeechSegment.segmentIsPositive = True
nextSpeechSegment.segmentPositive = 1.0
nextSpeechSegment.segmentSentimentScore = 1.0
elif turn_sentiment == "NEGATIVE":
nextSpeechSegment.segmentIsNegative = True
nextSpeechSegment.segmentNegative = 1.0
nextSpeechSegment.segmentSentimentScore = 1.0
# Return our full turn-by-turn speaker segment list with sentiment
return speechSegmentList