def create_combined_tca_graphic()

in pca-server/src/pca/pca-aws-sf-process-turn-by-turn.py [0:0]


    def create_combined_tca_graphic(self):
        """
        Creates a combined graphic representing the following pieces of metadata:
        - Multiple stacked charts, one per speaker that exists in the call
        - Decibel level per second of the call
        - Sentiment markers across the call
        - Indication of interruptions across the call
        - Indication of non-talk time across the call

        :return: URL to the combined chart graphic
        """
        image_url = ""

        # Initialise our loudness structures
        secsLoudAgent = []
        dbLoudAgent = []
        secsLoudCaller = []
        dbLoudCaller = []

        # Work out which channel our agent is on
        agent_channel = "spk_" + str(self.analytics_channel_map["AGENT"])
        caller_channel = "spk_" + str(self.analytics_channel_map["CUSTOMER"])

        # Work through each conversation turn, extracting timestamp/decibel values as we go
        for segment in self.speechSegmentList:
            this_second = int(segment.segmentStartTime)
            # Each segment has a loudness score per second or part second
            for score in segment.segmentLoudnessScores:
                # This can be set to NONE, which causes errors later
                if score is None:
                    score = 0.0
                # Track the Agent loudness
                if segment.segmentSpeaker == agent_channel:
                    secsLoudAgent.append(this_second)
                    dbLoudAgent.append(score)
                # Track the Caller loudness
                else:
                    secsLoudCaller.append(this_second)
                    dbLoudCaller.append(score)
                this_second += 1
        agentLoudness = {"Seconds": secsLoudAgent, "dB": dbLoudAgent}
        callerLoudness = {"Seconds": secsLoudCaller, "dB": dbLoudCaller}

        # Get some pointers into our transcription blocks
        talk_time = self.asr_output["ConversationCharacteristics"]["TalkTime"]
        interruptions = self.asr_output["ConversationCharacteristics"]["Interruptions"]
        quiet_times = self.asr_output["ConversationCharacteristics"]["NonTalkTime"]

        # Work out our final talk "second", as we need both charts to line up, but
        # be careful as there may just be one speaker in the Call Analytics output
        if talk_time["DetailsByParticipant"]["AGENT"]["TotalTimeMillis"] == 0:
            final_second = max(secsLoudCaller)
            max_decibel = max(dbLoudCaller)
            haveAgent = False
            haveCaller = True
            plotRows = 1
        elif talk_time["DetailsByParticipant"]["CUSTOMER"]["TotalTimeMillis"] == 0:
            final_second = max(secsLoudAgent)
            max_decibel = max(dbLoudAgent)
            haveAgent = True
            haveCaller = False
            plotRows = 1
        else:
            final_second = max(max(secsLoudAgent), max(secsLoudCaller))
            max_decibel = max(max(dbLoudAgent), max(dbLoudCaller))
            haveAgent = True
            haveCaller = True
            plotRows = 2

        # Add some headroom to our decibel limit to give space for "interruption" markers
        max_decibel_headroom = (int(max_decibel / 10) + 2) * 10

        # Create a dataset for interruptions, which needs to be in the background on both charts
        intSecs = []
        intDb = []
        for speaker in interruptions["InterruptionsByInterrupter"]:
            for entry in interruptions["InterruptionsByInterrupter"][speaker]:
                start = int(entry["BeginOffsetMillis"] / 1000)
                end = int(entry["EndOffsetMillis"] / 1000)
                for second in range(start, end + 1):
                    intSecs.append(second)
                    intDb.append(max_decibel_headroom)
        intSegments = {"Seconds": intSecs, "dB": intDb}

        # Create a dataset for non-talk time, which needs to be in the background on both charts
        quietSecs = []
        quietdB = []
        for quiet_period in quiet_times["Instances"]:
            start = int(quiet_period["BeginOffsetMillis"] / 1000)
            end = int(quiet_period["EndOffsetMillis"] / 1000)
            for second in range(start, end + 1):
                quietSecs.append(second)
                quietdB.append(max_decibel_headroom)
        quietSegments = {"Seconds": quietSecs, "dB": quietdB}

        # Either speaker may be missing, so we cannot assume this is a 2-row or 1-row plot
        # We want a 2-row figure, one row per speaker, but with the interruptions on the background
        fig, ax = plt.subplots(nrows=plotRows, ncols=1, figsize=(12, 2.5 * plotRows))
        if haveAgent:
            if haveCaller:
                self.build_single_loudness_chart(ax[0], agentLoudness, intSegments, quietSegments,
                                                 self.speechSegmentList, final_second, max_decibel_headroom, "Agent",
                                                 agent_channel, False, True)
                self.build_single_loudness_chart(ax[1], callerLoudness, intSegments, quietSegments,
                                                 self.speechSegmentList, final_second, max_decibel_headroom, "Customer",
                                                 caller_channel, True, False)
            else:
                self.build_single_loudness_chart(ax, agentLoudness, intSegments, quietSegments, self.speechSegmentList,
                                                 final_second, max_decibel_headroom, "Agent", agent_channel, True, True)
        elif haveCaller:
            self.build_single_loudness_chart(ax, callerLoudness, intSegments, quietSegments, self.speechSegmentList,
                                             final_second, max_decibel_headroom, "Customer", caller_channel, True, True)

        # Generate and store the chart locally
        base_filename = f"{self.jsonOutputFilename.split('.json')[0]}.png"
        chart_filename = f"{TMP_DIR}/{base_filename}"
        fig.savefig(chart_filename, facecolor="aliceblue")

        # Upload the graphic to S3
        s3Client = boto3.client('s3')
        object_key = "tcaImagery/" + base_filename
        s3Client.upload_file(chart_filename, cf.appConfig[cf.CONF_S3BUCKET_OUTPUT], object_key)

        # Remove the local file and return our S3 URL so that the UI can create signed URLs for browser rendering
        pcacommon.remove_temp_file(chart_filename)
        image_url = f"s3://{cf.appConfig[cf.CONF_S3BUCKET_OUTPUT]}/{object_key}"
        return image_url