def create_turn_by_turn_segments()

in pca-server/src/pca/pca-aws-sf-process-turn-by-turn.py [0:0]


    def create_turn_by_turn_segments(self, transcribe_job_filename):
        """
        Creates a list of conversational turns, splitting up by speaker or if there's a noticeable pause in
        conversation.  Notes, this works differently for speaker-separated and channel-separated files. For speaker-
        the lines are already separated by speaker, so we only worry about splitting up speaker pauses of more than 3
        seconds, but for channel- we have to hunt gaps of 100ms across an entire channel, then sort segments from both
        channels, then merge any together to ensure we keep to the 3-second pause; this way means that channel- files
        are able to show interleaved speech where speakers are talking over one another.  Once all of this is done
        we inject sentiment into each segment.
        """
        speechSegmentList = []

        # Load in the JSON file for processing
        json_filepath = Path(transcribe_job_filename)
        self.asr_output = json.load(open(json_filepath.absolute(), "r", encoding="utf-8"))
        is_analytics_mode = (self.api_mode == cf.API_ANALYTICS)

        # Decide on our operational mode and set the overall job language
        if is_analytics_mode:
            # We ignore speaker/channel mode on Analytics
            isChannelMode = False
            isSpeakerMode = False
        else:
            # Channel/Speaker-mode only relevant if not using analytics
            isChannelMode = self.transcribeJobInfo["Settings"]["ChannelIdentification"]
            isSpeakerMode = not isChannelMode

        lastSpeaker = ""
        lastEndTime = 0.0
        skipLeadingSpace = False
        confidenceList = []
        nextSpeechSegment = None

        # Process a Speaker-separated non-Analytics file
        if isSpeakerMode:
            # A segment is a blob of pronunciation and punctuation by an individual speaker
            for segment in self.asr_output["results"]["speaker_labels"]["segments"]:

                # If there is content in the segment then pick out the time and speaker
                if len(segment["items"]) > 0:
                    # Pick out our next data
                    nextStartTime = float(segment["start_time"])
                    nextEndTime = float(segment["end_time"])
                    nextSpeaker = self.generate_speaker_label(standard_ts_speaker=str(segment["speaker_label"]))

                    # If we've changed speaker, or there's a 3-second gap, create a new row
                    if (nextSpeaker != lastSpeaker) or ((nextStartTime - lastEndTime) >= 3.0):
                        nextSpeechSegment = SpeechSegment()
                        speechSegmentList.append(nextSpeechSegment)
                        nextSpeechSegment.segmentStartTime = nextStartTime
                        nextSpeechSegment.segmentSpeaker = nextSpeaker
                        skipLeadingSpace = True
                        confidenceList = []
                        nextSpeechSegment.segmentConfidence = confidenceList
                    nextSpeechSegment.segmentEndTime = nextEndTime

                    # Note the speaker and end time of this segment for the next iteration
                    lastSpeaker = nextSpeaker
                    lastEndTime = nextEndTime

                    # For each word in the segment...
                    for word in segment["items"]:

                        # Get the word with the highest confidence
                        pronunciations = list(filter(lambda x: x["type"] == "pronunciation", self.asr_output["results"]["items"]))
                        word_result = list(filter(lambda x: x["start_time"] == word["start_time"] and x["end_time"] == word["end_time"], pronunciations))
                        try:
                            result = sorted(word_result[-1]["alternatives"], key=lambda x: x["confidence"])[-1]
                            confidence = float(result["confidence"])
                        except:
                            result = word_result[-1]["alternatives"][0]
                            confidence = float(result["redactions"][0]["confidence"])

                        # Write the word, and a leading space if this isn't the start of the segment
                        if skipLeadingSpace:
                            skipLeadingSpace = False
                            wordToAdd = result["content"]
                        else:
                            wordToAdd = " " + result["content"]

                        # If the next item is punctuation, add it to the current word
                        try:
                            word_result_index = self.asr_output["results"]["items"].index(word_result[0])
                            next_item = self.asr_output["results"]["items"][word_result_index + 1]
                            if next_item["type"] == "punctuation":
                                wordToAdd += next_item["alternatives"][0]["content"]
                        except IndexError:
                            pass

                        # Add word and confidence to the segment and to our overall stats
                        nextSpeechSegment.segmentText += wordToAdd
                        confidenceList.append({"Text": wordToAdd,
                                               "Confidence": confidence,
                                               "StartTime": float(word["start_time"]),
                                               "EndTime": float(word["end_time"])})
                        self.numWordsParsed += 1
                        self.cummulativeWordAccuracy += confidence

        # Process a Channel-separated file
        elif isChannelMode:

            # A channel contains all pronunciation and punctuation from a single speaker
            for channel in self.asr_output["results"]["channel_labels"]["channels"]:

                # If there is content in the channel then start processing it
                if len(channel["items"]) > 0:

                    # We have the same speaker all the way through this channel
                    nextSpeaker = self.generate_speaker_label(standard_ts_speaker=str(channel["channel_label"]))
                    for word in channel["items"]:
                        # Pick out our next data from a 'pronunciation'
                        if word["type"] == "pronunciation":
                            nextStartTime = float(word["start_time"])
                            nextEndTime = float(word["end_time"])

                            # If we've changed speaker, or we haven't and the
                            # pause is very small, then start a new text segment
                            if (nextSpeaker != lastSpeaker) or\
                                    ((nextSpeaker == lastSpeaker) and ((nextStartTime - lastEndTime) > 0.1)):
                                nextSpeechSegment = SpeechSegment()
                                speechSegmentList.append(nextSpeechSegment)
                                nextSpeechSegment.segmentStartTime = nextStartTime
                                nextSpeechSegment.segmentSpeaker = nextSpeaker
                                skipLeadingSpace = True
                                confidenceList = []
                                nextSpeechSegment.segmentConfidence = confidenceList
                            nextSpeechSegment.segmentEndTime = nextEndTime

                            # Note the speaker and end time of this segment for the next iteration
                            lastSpeaker = nextSpeaker
                            lastEndTime = nextEndTime

                            # Get the word with the highest confidence
                            pronunciations = list(filter(lambda x: x["type"] == "pronunciation", channel["items"]))
                            word_result = list(filter(lambda x: x["start_time"] == word["start_time"] and x["end_time"] == word["end_time"], pronunciations))
                            try:
                                result = sorted(word_result[-1]["alternatives"], key=lambda x: x["confidence"])[-1]
                                confidence = float(result["confidence"])
                            except:
                                result = word_result[-1]["alternatives"][0]
                                confidence = float(result["redactions"][0]["confidence"])

                            # Write the word, and a leading space if this isn't the start of the segment
                            if skipLeadingSpace:
                                skipLeadingSpace = False
                                wordToAdd = result["content"]
                            else:
                                wordToAdd = " " + result["content"]

                            # If the next item is punctuation, add it to the current word
                            try:
                                word_result_index = channel["items"].index(word_result[0])
                                next_item = channel["items"][word_result_index + 1]
                                if next_item["type"] == "punctuation":
                                    wordToAdd += next_item["alternatives"][0]["content"]
                            except IndexError:
                                pass

                            # Add word and confidence to the segment and to our overall stats
                            nextSpeechSegment.segmentText += wordToAdd
                            confidenceList.append({"Text": wordToAdd,
                                                   "Confidence": confidence,
                                                   "StartTime": float(word["start_time"]),
                                                   "EndTime": float(word["end_time"])})
                            self.numWordsParsed += 1
                            self.cummulativeWordAccuracy += confidence

            # Sort the segments, as they are in channel-order and not speaker-order, then
            # merge together turns from the same speaker that are very close together
            speechSegmentList = sorted(speechSegmentList, key=lambda segment: segment.segmentStartTime)
            speechSegmentList = self.merge_speaker_segments(speechSegmentList)

        # Process a Call Analytics file
        elif is_analytics_mode:

            # Create our speaker mapping - we need consistent output like spk_0 | spk_1
            # across all Transcribe API variants to help the UI render it all the same
            for channel_def in self.transcribeJobInfo["ChannelDefinitions"]:
                self.analytics_channel_map[channel_def["ParticipantRole"]] = channel_def["ChannelId"]

            # Lookup shortcuts
            interrupts = self.asr_output["ConversationCharacteristics"]["Interruptions"]

            # Each turn has already been processed by Transcribe, so the outputs are in order
            for turn in self.asr_output["Transcript"]:

                 # Get our next speaker name
                nextSpeaker = self.generate_speaker_label(analytics_ts_speaker=turn["ParticipantRole"])

                # Setup the next speaker block
                nextSpeechSegment = SpeechSegment()
                speechSegmentList.append(nextSpeechSegment)
                nextSpeechSegment.segmentStartTime = float(turn["BeginOffsetMillis"]) / 1000.0
                nextSpeechSegment.segmentEndTime = float(turn["EndOffsetMillis"]) / 1000.0
                nextSpeechSegment.segmentSpeaker = nextSpeaker
                nextSpeechSegment.segmentText = turn["Content"]
                nextSpeechSegment.segmentLoudnessScores = turn["LoudnessScores"]
                confidenceList = []
                nextSpeechSegment.segmentConfidence = confidenceList
                skipLeadingSpace = True

                # Check if this block is within an interruption block for the speaker
                if turn["ParticipantRole"] in interrupts["InterruptionsByInterrupter"]:
                    for entry in interrupts["InterruptionsByInterrupter"][turn["ParticipantRole"]]:
                        if turn["BeginOffsetMillis"] == entry["BeginOffsetMillis"]:
                            nextSpeechSegment.segmentInterruption = True

                # Process each word in this turn
                for word in turn["Items"]:
                    # Pick out our next data from a 'pronunciation'
                    if word["Type"] == "pronunciation":
                        # Write the word, and a leading space if this isn't the start of the segment
                        if skipLeadingSpace:
                            skipLeadingSpace = False
                            wordToAdd = word["Content"]
                        else:
                            wordToAdd = " " + word["Content"]

                        # If the word is redacted then the word confidence is a bit more buried
                        if "Confidence" in word:
                            conf_score = float(word["Confidence"])
                        elif "Redaction" in word:
                            conf_score = float(word["Redaction"][0]["Confidence"])

                        # Add the word and confidence to this segment's list and to our overall stats
                        confidenceList.append({"Text": wordToAdd,
                                               "Confidence": conf_score,
                                               "StartTime": float(word["BeginOffsetMillis"]) / 1000.0,
                                               "EndTime": float(word["BeginOffsetMillis"] / 1000.0)})
                        self.numWordsParsed += 1
                        self.cummulativeWordAccuracy += conf_score

                    else:
                        # Punctuation, needs to be added to the previous word
                        last_word = nextSpeechSegment.segmentConfidence[-1]
                        last_word["Text"] = last_word["Text"] + word["Content"]

                # Record any issues detected
                if "IssuesDetected" in turn:
                    for issue in turn["IssuesDetected"]:
                        # Grab the transcript offsets for the issue text
                        begin_offset = issue["CharacterOffsets"]["Begin"]
                        end_offset = issue["CharacterOffsets"]["End"]
                        next_issue = {"Text": nextSpeechSegment.segmentText[begin_offset:end_offset],
                                      "BeginOffset": begin_offset,
                                      "EndOffset": end_offset}

                        # Tag this one on to our segment list and the header list
                        nextSpeechSegment.segmentIssuesDetected.append(next_issue)
                        self.issues_detected.append(next_issue)

                # Tag on the sentiment - analytics has no per-turn numbers, so max out the
                # positive and negative, which effectively is 1.0 * COMPREHEND_SENTIMENT_SCALER
                turn_sentiment = turn["Sentiment"]
                if turn_sentiment == "POSITIVE":
                    nextSpeechSegment.segmentIsPositive = True
                    nextSpeechSegment.segmentPositive = 1.0
                    nextSpeechSegment.segmentSentimentScore = COMPREHEND_SENTIMENT_SCALER
                elif turn_sentiment == "NEGATIVE":
                    nextSpeechSegment.segmentIsNegative = True
                    nextSpeechSegment.segmentNegative = 1.0
                    nextSpeechSegment.segmentSentimentScore = COMPREHEND_SENTIMENT_SCALER

        # Inject sentiments into the segment list
        self.extract_nlp(speechSegmentList)

        # If we ended up with any matched simple entities then insert
        # them, which we can now do as we now have the sentence order
        if self.simpleEntityMap != {}:
            self.create_simple_entity_entries(speechSegmentList)

        # Now set the overall call duration if we actually had any speech
        if len(speechSegmentList) > 0:
            self.duration = float(speechSegmentList[-1].segmentConfidence[-1]["EndTime"])

        # Return our full turn-by-turn speaker segment list with sentiment
        return speechSegmentList