in pca-server/src/pca/pca-aws-sf-process-turn-by-turn.py [0:0]
def create_combined_tca_graphic(self):
"""
Creates a combined graphic representing the following pieces of metadata:
- Multiple stacked charts, one per speaker that exists in the call
- Decibel level per second of the call
- Sentiment markers across the call
- Indication of interruptions across the call
- Indication of non-talk time across the call
:return: URL to the combined chart graphic
"""
image_url = ""
# Initialise our loudness structures
secsLoudAgent = []
dbLoudAgent = []
secsLoudCaller = []
dbLoudCaller = []
# Work out which channel our agent is on
agent_channel = "spk_" + str(self.analytics_channel_map["AGENT"])
caller_channel = "spk_" + str(self.analytics_channel_map["CUSTOMER"])
# Work through each conversation turn, extracting timestamp/decibel values as we go
for segment in self.speechSegmentList:
this_second = int(segment.segmentStartTime)
# Each segment has a loudness score per second or part second
for score in segment.segmentLoudnessScores:
# This can be set to NONE, which causes errors later
if score is None:
score = 0.0
# Track the Agent loudness
if segment.segmentSpeaker == agent_channel:
secsLoudAgent.append(this_second)
dbLoudAgent.append(score)
# Track the Caller loudness
else:
secsLoudCaller.append(this_second)
dbLoudCaller.append(score)
this_second += 1
agentLoudness = {"Seconds": secsLoudAgent, "dB": dbLoudAgent}
callerLoudness = {"Seconds": secsLoudCaller, "dB": dbLoudCaller}
# Get some pointers into our transcription blocks
talk_time = self.asr_output["ConversationCharacteristics"]["TalkTime"]
interruptions = self.asr_output["ConversationCharacteristics"]["Interruptions"]
quiet_times = self.asr_output["ConversationCharacteristics"]["NonTalkTime"]
# Work out our final talk "second", as we need both charts to line up, but
# be careful as there may just be one speaker in the Call Analytics output
if talk_time["DetailsByParticipant"]["AGENT"]["TotalTimeMillis"] == 0:
final_second = max(secsLoudCaller)
max_decibel = max(dbLoudCaller)
haveAgent = False
haveCaller = True
plotRows = 1
elif talk_time["DetailsByParticipant"]["CUSTOMER"]["TotalTimeMillis"] == 0:
final_second = max(secsLoudAgent)
max_decibel = max(dbLoudAgent)
haveAgent = True
haveCaller = False
plotRows = 1
else:
final_second = max(max(secsLoudAgent), max(secsLoudCaller))
max_decibel = max(max(dbLoudAgent), max(dbLoudCaller))
haveAgent = True
haveCaller = True
plotRows = 2
# Add some headroom to our decibel limit to give space for "interruption" markers
max_decibel_headroom = (int(max_decibel / 10) + 2) * 10
# Create a dataset for interruptions, which needs to be in the background on both charts
intSecs = []
intDb = []
for speaker in interruptions["InterruptionsByInterrupter"]:
for entry in interruptions["InterruptionsByInterrupter"][speaker]:
start = int(entry["BeginOffsetMillis"] / 1000)
end = int(entry["EndOffsetMillis"] / 1000)
for second in range(start, end + 1):
intSecs.append(second)
intDb.append(max_decibel_headroom)
intSegments = {"Seconds": intSecs, "dB": intDb}
# Create a dataset for non-talk time, which needs to be in the background on both charts
quietSecs = []
quietdB = []
for quiet_period in quiet_times["Instances"]:
start = int(quiet_period["BeginOffsetMillis"] / 1000)
end = int(quiet_period["EndOffsetMillis"] / 1000)
for second in range(start, end + 1):
quietSecs.append(second)
quietdB.append(max_decibel_headroom)
quietSegments = {"Seconds": quietSecs, "dB": quietdB}
# Either speaker may be missing, so we cannot assume this is a 2-row or 1-row plot
# We want a 2-row figure, one row per speaker, but with the interruptions on the background
fig, ax = plt.subplots(nrows=plotRows, ncols=1, figsize=(12, 2.5 * plotRows))
if haveAgent:
if haveCaller:
self.build_single_loudness_chart(ax[0], agentLoudness, intSegments, quietSegments,
self.speechSegmentList, final_second, max_decibel_headroom, "Agent",
agent_channel, False, True)
self.build_single_loudness_chart(ax[1], callerLoudness, intSegments, quietSegments,
self.speechSegmentList, final_second, max_decibel_headroom, "Customer",
caller_channel, True, False)
else:
self.build_single_loudness_chart(ax, agentLoudness, intSegments, quietSegments, self.speechSegmentList,
final_second, max_decibel_headroom, "Agent", agent_channel, True, True)
elif haveCaller:
self.build_single_loudness_chart(ax, callerLoudness, intSegments, quietSegments, self.speechSegmentList,
final_second, max_decibel_headroom, "Customer", caller_channel, True, True)
# Generate and store the chart locally
base_filename = f"{self.jsonOutputFilename.split('.json')[0]}.png"
chart_filename = f"{TMP_DIR}/{base_filename}"
fig.savefig(chart_filename, facecolor="aliceblue")
# Upload the graphic to S3
s3Client = boto3.client('s3')
object_key = "tcaImagery/" + base_filename
s3Client.upload_file(chart_filename, cf.appConfig[cf.CONF_S3BUCKET_OUTPUT], object_key)
# Remove the local file and return our S3 URL so that the UI can create signed URLs for browser rendering
pcacommon.remove_temp_file(chart_filename)
image_url = f"s3://{cf.appConfig[cf.CONF_S3BUCKET_OUTPUT]}/{object_key}"
return image_url