def processContactLensConversationCharacteristics()

in sam-app/lambda_functions/sfContactLensUtil.py [0:0]


def processContactLensConversationCharacteristics(contactLensObj, connectBucket, transcripts):
    resultSet = {}

    # Overall Sentiment
    resultSet['contactLensAgentOverallSentiment'] = contactLensObj['ConversationCharacteristics']['Sentiment']['OverallSentiment']['AGENT'] if 'AGENT' in contactLensObj['ConversationCharacteristics']['Sentiment']['OverallSentiment'] else None
    resultSet['contactLensCustomerOverallSentiment'] = contactLensObj['ConversationCharacteristics']['Sentiment']['OverallSentiment']['CUSTOMER'] if 'CUSTOMER' in contactLensObj['ConversationCharacteristics']['Sentiment']['OverallSentiment'] else None

    # Sentiment By Period
    if 'CUSTOMER' in contactLensObj['ConversationCharacteristics']['Sentiment']['SentimentByPeriod']['QUARTER']:
        customerSentimentCurve = []
        customerSentimentCurveLabel = 'Other'
        for quarter in contactLensObj['ConversationCharacteristics']['Sentiment']['SentimentByPeriod']['QUARTER']['CUSTOMER']:
            customerSentimentCurve.append(quarter['Score'])
        customerSentimentCurve[1:3] = [sum(customerSentimentCurve[1:3]) / 2]
        if (customerSentimentCurve[0] <= customerSentimentCurve[1] - 1) & (customerSentimentCurve[1] < customerSentimentCurve[2]):
            customerSentimentCurveLabel = 'S'
        elif (customerSentimentCurve[0] >= customerSentimentCurve[1] + 1) & (customerSentimentCurve[1] > customerSentimentCurve[2]):
            customerSentimentCurveLabel = 'Z'
        resultSet['contactLensCustomerSentimentCurve'] = customerSentimentCurveLabel
    else:
        resultSet['contactLensCustomerSentimentCurve'] = None

    # Interruptions Total Count
    resultSet['contactLensInterruptions'] = contactLensObj['ConversationCharacteristics']['Interruptions']['TotalCount']
    resultSet['contactLensAgentInterruptions'] = json.dumps(contactLensObj['ConversationCharacteristics']['Interruptions']['InterruptionsByInterrupter']['AGENT']) if 'AGENT' in contactLensObj['ConversationCharacteristics']['Interruptions']['InterruptionsByInterrupter'] else None
    resultSet['contactLensCustomerInterruptions'] = json.dumps(contactLensObj['ConversationCharacteristics']['Interruptions']['InterruptionsByInterrupter']['CUSTOMER']) if 'CUSTOMER' in contactLensObj['ConversationCharacteristics']['Interruptions']['InterruptionsByInterrupter'] else None
    

    # Non Talk Time 
    resultSet['contactLensNonTalkTime'] = contactLensObj['ConversationCharacteristics']['NonTalkTime']['TotalTimeMillis']

    # Talk Speed
    resultSet['contactLensTalkSpeedCustomer'] = contactLensObj['ConversationCharacteristics']['TalkSpeed']['DetailsByParticipant']['CUSTOMER']['AverageWordsPerMinute']
    resultSet['contactLensTalkSpeedAgent'] = contactLensObj['ConversationCharacteristics']['TalkSpeed']['DetailsByParticipant']['AGENT']['AverageWordsPerMinute']

    # Talk time
    resultSet['contactLensTalkTimeTotal'] = contactLensObj['ConversationCharacteristics']['TalkTime']['TotalTimeMillis']
    resultSet['contactLensTalkTimeCustomer'] = contactLensObj['ConversationCharacteristics']['TalkTime']['DetailsByParticipant']['CUSTOMER']['TotalTimeMillis']
    resultSet['contactLensTalkTimeAgent'] = contactLensObj['ConversationCharacteristics']['TalkTime']['DetailsByParticipant']['AGENT']['TotalTimeMillis']

    # Categories
    resultSet['contactLensMatchedCategories'] = '|'.join(contactLensObj['Categories']['MatchedCategories']) if len(contactLensObj['Categories']['MatchedCategories']) > 0 else None
    resultSet['contactLensMatchedDetails'] = json.dumps(contactLensObj['Categories']['MatchedDetails'])

    # Recording Path
    contactAttributes = getContactAttributes(contactLensObj)
    contactId = contactLensObj['CustomerMetadata']['ContactId']
    if ('postcallRedactedRecordingImportEnabled' in contactAttributes and contactAttributes['postcallRedactedRecordingImportEnabled'] == 'true'):
        logger.info('Redacted recording import is enabled')
        redactedRecordingLocation = getRedactedRecordingLocation(contactId, connectBucket)
        resultSet['recordingPath'] = redactedRecordingLocation
    else:
        resultSet['recordingPath'] = None

    # Transcript Full Text
    transcriptsText = []
    if len(transcripts) > 0:
        for transcript in transcripts:
            transcriptsText.append(transcript["content"])
    resultSet['contactLensTranscriptsFullText'] = ' '.join(transcriptsText)

    return resultSet