def create_turn_by_turn_segments()

in python/ts-to-word.py [0:0]


def create_turn_by_turn_segments(data, cli_args):
    """
    This creates a list of per-turn speech segments based upon the transcript data.  It has to work in three
    slightly different ways, as each operational mode from Transcribe outputs slightly different JSON structures.
    These modes are (a) Speaker-separated audio, (b) Channel-separated audio, and (c) Call Analytics audio

    :param data: JSON result data from Transcribe
    :param cli_args: CLI arguments used for this processing run
    :return: List of transcription speech segments
    """
    speechSegmentList = []

    # Decide on our operational mode - it's in the job-status or, if necessary, infer it from the data file
    # STANDARD => speaker separated, channel separated;  ANALYTICS => different format
    isAnalyticsMode = cli_args.analyticsMode
    if isAnalyticsMode:
        # We know if its analytics mode, as it's defined in the job-status and file
        isChannelMode = False
        isSpeakerMode = False
    else:
        # Channel/Speaker-mode only relevant if not using analytics
        isChannelMode = "channel_labels" in data["results"]
        isSpeakerMode = not isChannelMode

    lastSpeaker = ""
    lastEndTime = 0.0
    skipLeadingSpace = False
    confidenceList = []
    nextSpeechSegment = None

    # Process a Speaker-separated non-analytics file
    if isSpeakerMode:
        # A segment is a blob of pronunciation and punctuation by an individual speaker
        for segment in data["results"]["speaker_labels"]["segments"]:

            # If there is content in the segment then pick out the time and speaker
            if len(segment["items"]) > 0:
                # Pick out our next data
                nextStartTime = float(segment["start_time"])
                nextEndTime = float(segment["end_time"])
                nextSpeaker = str(segment["speaker_label"])

                # If we've changed speaker, or there's a gap, create a new row
                if (nextSpeaker != lastSpeaker) or ((nextStartTime - lastEndTime) >= START_NEW_SEGMENT_DELAY):
                    nextSpeechSegment = SpeechSegment()
                    speechSegmentList.append(nextSpeechSegment)
                    nextSpeechSegment.segmentStartTime = nextStartTime
                    nextSpeechSegment.segmentSpeaker = nextSpeaker
                    skipLeadingSpace = True
                    confidenceList = []
                    nextSpeechSegment.segmentConfidence = confidenceList
                nextSpeechSegment.segmentEndTime = nextEndTime

                # Note the speaker and end time of this segment for the next iteration
                lastSpeaker = nextSpeaker
                lastEndTime = nextEndTime

                # For each word in the segment...
                for word in segment["items"]:

                    # Get the word with the highest confidence
                    pronunciations = list(filter(lambda x: x["type"] == "pronunciation", data["results"]["items"]))
                    word_result = list(filter(lambda x: x["start_time"] == word["start_time"] and x["end_time"] == word["end_time"], pronunciations))
                    try:
                        result = sorted(word_result[-1]["alternatives"], key=lambda x: x["confidence"])[-1]
                        confidence = float(result["confidence"])
                    except:
                        result = word_result[-1]["alternatives"][0]
                        confidence = float(result["redactions"][0]["confidence"])

                    # Write the word, and a leading space if this isn't the start of the segment
                    if skipLeadingSpace:
                        skipLeadingSpace = False
                        wordToAdd = result["content"]
                    else:
                        wordToAdd = " " + result["content"]

                    # If the next item is punctuation, add it to the current word
                    try:
                        word_result_index = data["results"]["items"].index(word_result[0])
                        next_item = data["results"]["items"][word_result_index + 1]
                        if next_item["type"] == "punctuation":
                            wordToAdd += next_item["alternatives"][0]["content"]
                    except IndexError:
                        pass

                    nextSpeechSegment.segmentText += wordToAdd
                    confidenceList.append({"text": wordToAdd,
                                           "confidence": confidence,
                                           "start_time": float(word["start_time"]),
                                           "end_time": float(word["end_time"])})

    # Process a Channel-separated non-analytics file
    elif isChannelMode:

        # A channel contains all pronunciation and punctuation from a single speaker
        for channel in data["results"]["channel_labels"]["channels"]:

            # If there is content in the channel then start processing it
            if len(channel["items"]) > 0:

                # We have the same speaker all the way through this channel
                nextSpeaker = str(channel["channel_label"])
                for word in channel["items"]:
                    # Pick out our next data from a 'pronunciation'
                    if word["type"] == "pronunciation":
                        nextStartTime = float(word["start_time"])
                        nextEndTime = float(word["end_time"])

                        # If we've changed speaker, or we haven't and the
                        # pause is very small, then start a new text segment
                        if (nextSpeaker != lastSpeaker) or\
                                ((nextSpeaker == lastSpeaker) and ((nextStartTime - lastEndTime) > 0.1)):
                            nextSpeechSegment = SpeechSegment()
                            speechSegmentList.append(nextSpeechSegment)
                            nextSpeechSegment.segmentStartTime = nextStartTime
                            nextSpeechSegment.segmentSpeaker = nextSpeaker
                            skipLeadingSpace = True
                            confidenceList = []
                            nextSpeechSegment.segmentConfidence = confidenceList
                        nextSpeechSegment.segmentEndTime = nextEndTime

                        # Note the speaker and end time of this segment for the next iteration
                        lastSpeaker = nextSpeaker
                        lastEndTime = nextEndTime

                        # Get the word with the highest confidence
                        pronunciations = list(filter(lambda x: x["type"] == "pronunciation", channel["items"]))
                        word_result = list(filter(lambda x: x["start_time"] == word["start_time"] and x["end_time"] == word["end_time"], pronunciations))
                        try:
                            result = sorted(word_result[-1]["alternatives"], key=lambda x: x["confidence"])[-1]
                            confidence = float(result["confidence"])
                        except:
                            result = word_result[-1]["alternatives"][0]
                            confidence = float(result["redactions"][0]["confidence"])
                        # result = sorted(word_result[-1]["alternatives"], key=lambda x: x["confidence"])[-1]

                        # Write the word, and a leading space if this isn't the start of the segment
                        if (skipLeadingSpace):
                            skipLeadingSpace = False
                            wordToAdd = result["content"]
                        else:
                            wordToAdd = " " + result["content"]

                        # If the next item is punctuation, add it to the current word
                        try:
                            word_result_index = channel["items"].index(word_result[0])
                            next_item = channel["items"][word_result_index + 1]
                            if next_item["type"] == "punctuation":
                                wordToAdd += next_item["alternatives"][0]["content"]
                        except IndexError:
                            pass

                        # Finally, add the word and confidence to this segment's list
                        nextSpeechSegment.segmentText += wordToAdd
                        confidenceList.append({"text": wordToAdd,
                                               "confidence": confidence,
                                               "start_time": float(word["start_time"]),
                                               "end_time": float(word["end_time"])})

        # Sort the segments, as they are in channel-order and not speaker-order, then
        # merge together turns from the same speaker that are very close together
        speechSegmentList = sorted(speechSegmentList, key=lambda segment: segment.segmentStartTime)
        speechSegmentList = merge_speaker_segments(speechSegmentList)

    # Process a Call Analytics file
    elif isAnalyticsMode:

        # Lookup shortcuts
        interrupts = data["ConversationCharacteristics"]["Interruptions"]

        # Each turn has already been processed by Transcribe, so the outputs are in order
        for turn in data["Transcript"]:

            # Setup the next speaker block
            nextSpeechSegment = SpeechSegment()
            speechSegmentList.append(nextSpeechSegment)
            nextSpeechSegment.segmentStartTime = float(turn["BeginOffsetMillis"]) / 1000.0
            nextSpeechSegment.segmentEndTime = float(turn["EndOffsetMillis"]) / 1000.0
            nextSpeechSegment.segmentSpeaker = turn["ParticipantRole"].title()
            nextSpeechSegment.segmentText = turn["Content"]
            nextSpeechSegment.segmentLoudnessScores = turn["LoudnessScores"]
            confidenceList = []
            nextSpeechSegment.segmentConfidence = confidenceList
            skipLeadingSpace = True

            # Check if this block is within an interruption block for the speaker
            if turn["ParticipantRole"] in interrupts["InterruptionsByInterrupter"]:
                for entry in interrupts["InterruptionsByInterrupter"][turn["ParticipantRole"]]:
                    if turn["BeginOffsetMillis"] == entry["BeginOffsetMillis"]:
                        nextSpeechSegment.segmentInterruption = True

            # Record any issues detected
            if "IssuesDetected" in turn:
                for issue in turn["IssuesDetected"]:
                    # Grab the transcript offsets for the issue text
                    nextSpeechSegment.segmentIssuesDetected.append(issue["CharacterOffsets"])

            # Process each word in this turn
            for word in turn["Items"]:
                # Pick out our next data from a 'pronunciation'
                if word["Type"] == "pronunciation":
                    # Write the word, and a leading space if this isn't the start of the segment
                    if skipLeadingSpace:
                        skipLeadingSpace = False
                        wordToAdd = word["Content"]
                    else:
                        wordToAdd = " " + word["Content"]

                    # If the word is redacted then the word confidence is a bit more buried
                    if "Confidence" in word:
                        conf_score = float(word["Confidence"])
                    elif "Redaction" in word:
                        conf_score = float(word["Redaction"][0]["Confidence"])

                    # Add the word and confidence to this segment's list
                    confidenceList.append({"text": wordToAdd,
                                           "confidence": conf_score,
                                           "start_time": float(word["BeginOffsetMillis"]) / 1000.0,
                                           "end_time": float(word["BeginOffsetMillis"] / 1000.0)})
                else:
                    # Punctuation, needs to be added to the previous word
                    last_word = nextSpeechSegment.segmentConfidence[-1]
                    last_word["text"] = last_word["text"] + word["Content"]

            # Tag on the sentiment - analytics has no per-turn numbers
            turn_sentiment = turn["Sentiment"]
            if turn_sentiment == "POSITIVE":
                nextSpeechSegment.segmentIsPositive = True
                nextSpeechSegment.segmentPositive = 1.0
                nextSpeechSegment.segmentSentimentScore = 1.0
            elif turn_sentiment == "NEGATIVE":
                nextSpeechSegment.segmentIsNegative = True
                nextSpeechSegment.segmentNegative = 1.0
                nextSpeechSegment.segmentSentimentScore = 1.0

    # Return our full turn-by-turn speaker segment list with sentiment
    return speechSegmentList