# End to End Baseline Summarisation

In this notebook you will use the configured conversation profile from earlier in the lab to perform summarization of audio transcripts with redacted PII. You will need the integration ID of your conversation profile created earlier to complete this lab.


# Installing required libraries and Authenticating GCP Credentials

In [None]:
! pip install -q google-cloud-storage google-cloud-dlp google-cloud-dialogflow google-cloud-speech

__IMPORTANT:__ You may ignore any error messages about the dependency resolver. After installing packages, restart the kernel for the notebook by going to __Kernel__ and __Restart Kernel__ before moving forward. You do not need to run the first cell again after completing the package installation

## Configure Google Cloud credentials

__Note:__ Replace `project-name` with your Project ID. You will need to uncomment the commented lines first if you are running this notebook in a Google Colab environment.

In [3]:
PROJECT_NAME='your-project-id' 

!gcloud config set project $PROJECT_NAME

Updated property [core/project].


## Import required libraries

In [4]:
from typing import Dict, List
import csv
import glob
import json
import time
import re
import json
import pandas as pd
import pickle
from google.cloud import storage
from google.cloud import speech_v1p1beta1 as speech
import google.cloud.dlp
from google.cloud import dialogflow_v2beta1 as dialogflow
import datetime

Replace the value of the`CONV_PROFILE_ID` variable with the integration ID you recorded earlier.


In [5]:
CONV_PROFILE_ID = "projects/your-project-id/locations/global/conversationProfiles/conv-profile-id"
GCS_BUCKET_URI = "gs://cloud-training" 
GCS_BUCKET_NAME = GCS_BUCKET_URI.split("//")[1]
AUDIO_FILE_INPUT_FOLDER_PREFIX = "specialized-training/ccai/audio_summarization" 
SUPPORTED_FILE_FORMATS = ["WAV","MP3"]

NUM_CHANNELS = 1 # Audio file for this lab is single channel. Update for dual channel.
LANGUAGE_CODE = "en-US" 
MODEL ="phone_call"
SAMPLE_RATE_HZ = 8000 
ENCODING = "MULAW" 
MIN_SPEAKER_COUNT = 2 
MAX_SPEAKER_COUNT = 2 

SPEECH_TO_TEXT_CONFIG = {
"audio_channel_count" : NUM_CHANNELS,
"model" : "phone_call",
"encoding" : ENCODING,
"diarization_config" : speech.SpeakerDiarizationConfig(enable_speaker_diarization=True,max_speaker_count=MAX_SPEAKER_COUNT,
                                                       min_speaker_count=MIN_SPEAKER_COUNT),
"language_code" : LANGUAGE_CODE,
"enableSeparateRecognitionPerChannel" : True if NUM_CHANNELS>1 else False,
"sample_rate_hertz" : SAMPLE_RATE_HZ
}

project_id = PROJECT_NAME
location = "global"
project_path = '/'.join(CONV_PROFILE_ID.split('/')[:4])
conversation_profile_id = CONV_PROFILE_ID

# Step 1: Transcribe conversation and run PII redaction on transcripts

## Utility Functions

Before summarizing transcripts, you will need to transcribe the audio to text and redact possibly sensitive information found in the transcripts. This will lower the risk of accidental data leakage.

**Note**: `INFO_TYPES` should be fine-tuned to fit customer's requirements. The existing `INFO_TYPES` in the cell below is the default setting but is subject to developer's discretion. To fine-tune `INFO_TYPES`, please refer to https://cloud.google.com/dlp/docs/infotypes-reference

First, instaniate a client to interact with the Data Loss Prevention (DLP) API and a function (`redact_dlp`) to redact sensitive information

In [6]:
dlp = google.cloud.dlp_v2.DlpServiceClient()
INFO_TYPES = ["AGE","CREDIT_CARD_NUMBER","CREDIT_CARD_TRACK_NUMBER","DATE","DATE_OF_BIRTH",
           "DOMAIN_NAME","EMAIL_ADDRESS","FEMALE_NAME","MALE_NAME","FIRST_NAME","GENDER",
           "GENERIC_ID","IP_ADDRESS","LAST_NAME","LOCATION","PERSON_NAME","PHONE_NUMBER",
           "STREET_ADDRESS"]

def redact_dlp(input_str,replacement_str=r"[redacted]"):

    inspect_config = {"info_types": [{"name": info_type} for info_type in INFO_TYPES]}
    deidentify_config = {
        "info_type_transformations": {
            "transformations": [
                {
                    "primitive_transformation": {
                        "replace_config": {
                            "new_value": {"string_value": replacement_str}
                        }
                    }
                }
            ]
        }
    }
    item = {"value": input_str}
    response = dlp.deidentify_content(
        request={
            "parent" :"projects/{}".format(PROJECT_NAME),
            "deidentify_config": deidentify_config,
            "inspect_config": inspect_config,
            "item": item,
        }
    )

    return str(response.item.value).strip()

Before defining a function to apply the DLP API, you define a function to transcribe your audio recording. The code following the definition of the `parse_transcript_results_single_channel` and `parse_transcript_results_dual_channel` functions imports the transcribed audio into a Pandas dataframe to make it easier to parse and apply the DLP API to the appropriate field of the transcripts.

In [7]:
def is_agent_msg_text(text):

  text = text.lower()

  if 'thank you for calling' in text or 'monitored line' in text \
      or 'recorded line' in text or 'monitor line' in text \
      or 'how can i help you' in text or 'calling from' in text or 'may i ask' in text\
      or 'advanced tech support' in text or 'support' in text:

    return True

  else:

    return False

def parse_transcript_results_single_channel(file_name, results):

    words_info = results[-1].alternatives[0].words
    transcript_records_list = list()
    current_speaker_tag = words_info[0].speaker_tag
    agent_speaker_tag=words_info[0].speaker_tag #Generally, the first speaker is agent as observed by manually analysing transcripts in case channel is 1
    current_speaker_speech_text = ""
    turn = 0

    for _word in words_info:

        if _word.speaker_tag == current_speaker_tag:

            current_speaker_speech_text = current_speaker_speech_text + " " + _word.word

        else:

            transcript_records_list.append({ "transcript_id" : file_name,"position" : turn ,
                                        "actor":"AGENT" if current_speaker_tag==agent_speaker_tag else "CUSTOMER",
                                       "speaker" : current_speaker_tag,
                                       "utterance" : redact_dlp(current_speaker_speech_text.strip())})
            current_speaker_tag = _word.speaker_tag
            current_speaker_speech_text = "" + _word.word
            turn += 1

    return transcript_records_list

# This function is not used in this lab, but recorded here for reference if working with dual channel audio.
def parse_transcript_results_dual_channel(file_name, results):

    result_list=[]
    agent_channel=None

    for index,_result in enumerate(results):

        result_dict={}
        result_dict['transcript_id']=file_name
        _transcript =_result.alternatives[0].transcript
        _channel_tag=str(_result.channel_tag)

        if _transcript=='':

            continue

        result_dict['position']= index

        if agent_channel is None:

            if is_agent_msg_text(_transcript):

                if _channel_tag=='1':

                    agent_channel='1'

                else:

                    agent_channel='2'

            else:

                if _channel_tag=='1':

                    agent_channel='2'

                else:

                    agent_channel='1'

        result_dict['actor']= 'AGENT' if _channel_tag==agent_channel else 'CUSTOMER'
        result_dict['channel']=_channel_tag
        result_dict['utterance']= redact_dlp(_transcript.strip())
        result_list.append(result_dict)

    return result_list

In [8]:
#Create clients for the Speech and Storage APIs

speech_client = speech.SpeechClient()
storage_client = storage.Client()

# Define configuration for Speech API

config = speech.RecognitionConfig(
    audio_channel_count= SPEECH_TO_TEXT_CONFIG["audio_channel_count"],
    model= SPEECH_TO_TEXT_CONFIG["model"],
    encoding=SPEECH_TO_TEXT_CONFIG["encoding"],
    diarization_config= SPEECH_TO_TEXT_CONFIG["diarization_config"],
    language_code= SPEECH_TO_TEXT_CONFIG["language_code"],
    enable_separate_recognition_per_channel= SPEECH_TO_TEXT_CONFIG["enableSeparateRecognitionPerChannel"],
    sample_rate_hertz=SPEECH_TO_TEXT_CONFIG["sample_rate_hertz"],
    enable_automatic_punctuation= True,
    enable_word_time_offsets= True,
    enable_word_confidence=True,
    use_enhanced= True
    )

INPUT_AUDIO_FILES_GCS_PATHS = storage_client.list_blobs(GCS_BUCKET_NAME, prefix= AUDIO_FILE_INPUT_FOLDER_PREFIX)
index = 1
all_transcripts = []
_bucket = storage_client.get_bucket(GCS_BUCKET_NAME)

# For each audio file do the following:
#  1. Import the audio file
#  2. Transcribe the conversation using the Speech API
#  3. Parse the transcipts and redact sensitive information using the DLP API.

for audio in INPUT_AUDIO_FILES_GCS_PATHS:

  if (str(audio.name).split("/")[1] != '') and (str(audio.name).split("/")[-1].split(".")[-1].upper() in SUPPORTED_FILE_FORMATS):
    
    try:

      audio_file = speech.RecognitionAudio(uri= GCS_BUCKET_URI + '/' + str(audio.name))
      response = speech_client.long_running_recognize(config=config, audio=audio_file)
      response_results = response.result()

    except Exception as e:

      print("Exception Occurred for Audio: {}".format(e))
      continue

    if(len(response_results.results) != 0):

      temp=[]

      if SPEECH_TO_TEXT_CONFIG["audio_channel_count"]==2:

        temp = parse_transcript_results_dual_channel(str(audio.name).split("/")[1].split('.')[0],response_results.results)

      else:

        temp = parse_transcript_results_single_channel(str(audio.name).split("/")[1].split('.')[0],response_results.results)

      all_transcripts.extend(temp)
    
    else:
        print('No files found.')

    print(f"Audio Transcribed :: {str(index)}")
    index += 1
    time.sleep(5) # Wait 5 sec per request to prevent "ResourceExhausted" error and avoid empty transcripts

Audio Transcribed :: 1


Before applying the baseline summarization model, you should explore the preprocessed and redacted output from one of the conversations. Here you will convert the `all_transcripts` into a Pandas dataframe and then look at one of the conversations. Note the portions of the conversation that were redacted by the DLP API.

In [9]:
eval_df = pd.DataFrame(all_transcripts)
eval_df

Unnamed: 0,transcript_id,position,actor,speaker,utterance
0,ccai,0,AGENT,1,Hello. Thank you for contacting the customer s...
1,ccai,1,CUSTOMER,2,to find out if there is a time slot for my exi...
2,ccai,2,AGENT,1,time. Sure. I can help you with that. I just n...
3,ccai,3,CUSTOMER,2,a few things. Can I get your name?
4,ccai,4,AGENT,1,This is [redacted]
5,ccai,5,CUSTOMER,2,and I'm showing a Yahoo email address on your ...
6,ccai,6,AGENT,1,"yes, I am Shopper"
7,ccai,7,CUSTOMER,2,42 great. Do you happen to have a order number...
8,ccai,8,AGENT,1,"If not, I can look it up. Let me"
9,ccai,9,CUSTOMER,2,grab it off the email I got with the order con...


# Step 2: Generate summaries from Baseline Summarization Model

In this step you will generate summaries for the redacted transcripts from the previous steps after defining a sequence of helper functions to work through the appropriate steps. The comments in the code give a rough description of each of the helper functions being created.

In [10]:
# Function to create a conversation for a given conservation profile

def create_conversation(client: dialogflow.ConversationsClient, project_id: str,
                        conversation_profile_id: str):

  conversation = dialogflow.Conversation()
  conversation.conversation_profile = conversation_profile_id

  request = dialogflow.CreateConversationRequest(
      parent=project_id,
      conversation=conversation,
  )
  response = client.create_conversation(request=request)
  return response

# Function to create a participant for a conversation (with a given conversation_id) with a specific role

def create_participant(client: dialogflow.ParticipantsClient, conversation_id,
                       role: dialogflow.Participant.Role):

  request = dialogflow.CreateParticipantRequest(
      parent=conversation_id,
      participant=dialogflow.Participant(role=role),
  )
  response = client.create_participant(request=request)

  return response

# Function to suggest a conversation summary using the configured conversation profile.

def suggest_conversation_summary(client: dialogflow.ConversationsClient,
                                 conversation_id: str):

  request = dialogflow.SuggestConversationSummaryRequest(
      conversation=conversation_id,)
  response = client.suggest_conversation_summary(request=request)

  return response

# Function to complete a conversation with a given conversation id.

def complete_conversation(client: dialogflow.ConversationsClient,
                          conversation_id: str):

  request = dialogflow.CompleteConversationRequest(name=conversation_id,)
  response = client.complete_conversation(request)

  return response

# Function to return a summary for a conversation using a specific conversation profile
# using the earlier helper functions.

def get_summary(
    conversations_client: dialogflow.ConversationsClient,
    participants_client: dialogflow.ParticipantsClient,
    project_id: str,
    conversation_profile_id: str,
    conversation,
):

  create_conversation_response = create_conversation(
      client=conversations_client,
      project_id=project_id,
      conversation_profile_id=conversation_profile_id,
  )
  conversation_id = create_conversation_response.name

  create_end_user_participant_response = create_participant(
      client=participants_client,
      conversation_id=conversation_id,
      role=dialogflow.Participant.Role.END_USER,
  )
  end_user_participant_id = create_end_user_participant_response.name

  create_human_agent_participant_response = create_participant(
      client=participants_client,
      conversation_id=conversation_id,
      role=dialogflow.Participant.Role.HUMAN_AGENT,
  )
  human_agent_participant_id = create_human_agent_participant_response.name

  batch_request = dialogflow.BatchCreateMessagesRequest()
  batch_request.parent = conversation_id
  turn_count = 0
  for role, text in conversation:
    if turn_count > 199: # API was erroring out if the conv length is more than 200
      # Pushing first 200 messages into the conversation
      batch_response = conversations_client.batch_create_messages(request=batch_request)

      # re-initiatizing batch request to continue updating messages
      batch_request = dialogflow.BatchCreateMessagesRequest()
      batch_request.parent = conversation_id

      turn_count = 0

    participant_id = human_agent_participant_id if role == 'AGENT' else end_user_participant_id

    #Batch creating Conversation
    requests = dialogflow.CreateMessageRequest()
    requests.parent = conversation_id
    requests.message.content = text
    requests.message.participant = participant_id
    requests.message.send_time = datetime.datetime.now()

    batch_request.requests.append(requests)
    turn_count += 1

  batch_create_message_response = conversations_client.batch_create_messages(request=batch_request)
  suggest_conversation_summary_response = suggest_conversation_summary(
      client=conversations_client,
      conversation_id=conversation_id,
  )

  return suggest_conversation_summary_response

Now call the Summarization API for transcript summarization to add the summary to the conversation strings.

In [11]:
conversations_client = dialogflow.ConversationsClient()
participants_client = dialogflow.ParticipantsClient()
results = []

for conversation_id in eval_df['transcript_id'].unique():

  #print(f'Running inference for: {conversation_id}')
  
  conversation = []
  conversation_df = eval_df.loc[(eval_df['transcript_id'] == conversation_id)]

  for idx in conversation_df.index:

    conversation.append((conversation_df.loc[idx, 'actor'], conversation_df.loc[idx, 'utterance']))

  get_summary_response = get_summary(
      conversations_client=conversations_client,
      participants_client=participants_client,
      project_id=project_path,
      conversation_profile_id=conversation_profile_id,
      conversation=conversation,
  )

  conversation_string = '\n'.join(
      (f'{role}: {text}' for role, text in conversation))
  results.append({
      'transcript_id': conversation_id,
      'full_conversation': conversation_string,
      'summary': get_summary_response.summary.text
  })


Now we can explore the output from the baseline summarization model for the conversation that you looked at earlier.

In [12]:
summ_df = pd.DataFrame(results)
print(summ_df.iloc[0]['summary'].replace('\n','\n\n'))

situation

Customer wants to know if there is a time slot for their existing delivery. Customer also wants to know if they can have the electric dryer installed.

action

Agent checks the order status and informs the customer that the delivery is scheduled for [redacted] between 1 p.m. and 5 p.m. Agent also offers to schedule an installation for 69.99, but the customer declines.

resolution

Partial
