infra-as-code/modules/ingest-pipeline/cf-ccai-conversation-upload/insights_uploader.py (123 lines of code) (raw):
import pickle
import functions_framework
import os
import google.auth
import google.auth.transport.requests
from google.auth import impersonated_credentials
from google.cloud import contact_center_insights_v1
import google.cloud.dlp_v2
import requests
import json
import os
import time
from google.cloud import storage
import google.auth
import google.auth.transport.requests
import google.cloud.logging
from record import RecordKeeper
class InsightsUploader:
"""
A class to upload conversations and their transcripts to Contact Center AI Insights.
Attributes:
project_id (str): The Google Cloud project ID.
insights_endpoint (str): The endpoint for the Contact Center AI Insights API.
insights_api_version (str): The version of the Contact Center AI Insights API.
ccai_insights_project_id (str): The project ID for CCAI Insights.
ccai_insights_location_id (str): The location ID for CCAI Insights.
ingest_record_bucket_id (str): The bucket ID for storing ingestion records.
"""
def __init__(self, project_id, insights_endpoint, insights_api_version, ccai_insights_project_id, ccai_insights_location_id, ingest_record_bucket_id):
"""
Initializes InsightsUploader with project and API settings.
Retrieves an OAuth token for authentication.
"""
self.project_id = project_id
self.insights_endpoint = insights_endpoint
self.insights_api_version = insights_api_version
self.ccai_insights_project_id = ccai_insights_project_id
self.ccai_insights_location_id = ccai_insights_location_id
self.oauth_token = self.get_oauth_token()
self.ingest_record_bucket_id = ingest_record_bucket_id
def get_client_credentials(self):
"""
Retrieves default client credentials for Google Cloud authentication.
Returns:
google.auth.credentials.Credentials: The client credentials.
"""
creds, _ = google.auth.default(
scopes=['https://www.googleapis.com/auth/cloud-platform']
)
return creds
def get_oauth_token(self):
"""
Obtains an OAuth token using client credentials.
Returns:
str: The OAuth token.
"""
creds = self.get_client_credentials()
auth_req = google.auth.transport.requests.Request()
creds.refresh(auth_req)
return creds.token
def get_gcs_uri(self, bucket, object_name):
"""
Constructs a Google Cloud Storage URI from bucket and object name.
Args:
bucket (str): The bucket name.
object_name (str): The object name.
Returns:
str: The GCS URI.
"""
return 'gs://{}/{}'.format(bucket, object_name)
def get_audiofile_metadata(self, bucket_name, object_name):
"""
Retrieves metadata from a Google Cloud Storage blob.
Args:
bucket_name (str): The bucket name.
object_name (str): The object name.
Returns:
dict: The metadata extracted from the blob.
Raises:
Exception: If unable to retrieve metadata.
"""
creds = self.get_client_credentials()
storage_client = storage.Client(credentials=creds)
bucket = storage_client.bucket(bucket_name)
blob = bucket.get_blob(object_name)
print("Bucket name: {}".format(bucket))
print("Blob: {}".format(blob))
if blob.metadata:
metadata = dict()
#TODO define agent ID with corresponding value in case it's needed
metadata['qualityMetadata'] = {"agentInfo":[{"agentId": "Undefined"}]}
metadata['agentId'] = "Undefined"
metadata['labels'] = dict()
if 'original_file_name' in blob.metadata:
metadata['labels']['original_file_name'] = blob.metadata['original_file_name']
if 'patient_id' in blob.metadata:
metadata['labels']['patient_phone_number'] = blob.metadata['patient_id']
if 'categories' in blob.metadata:
metadata['labels']['categories'] = blob.metadata['categories']
print("Retrieved metadata from file")
return metadata
else:
raise Exception("Unable to retrieve metadata of agent")
def log_error(self, operation_message, gcs_audio_uri, gcs_transcript_uri, endpoint_url):
"""
Logs an error message to Cloud Logging.
Args:
operation_message (str): The error message.
gcs_audio_uri (str): The GCS URI of the audio file.
gcs_transcript_uri (str): The GCS URI of the transcript file.
endpoint_url (str): The URL of the API endpoint.
"""
creds = self.get_client_credentials()
client = google.cloud.logging.Client(project = self.project_id, credentials = creds)
logger = client.logger(name="cf_insights_uploader_logger")
entry = dict()
entry['function'] = 'An error occurred running the CCAI Insights upload conversation'
entry['operation_message'] = operation_message
entry['audio_file_gcs_path'] = gcs_audio_uri
entry['transcript_file_gcs_path'] = gcs_transcript_uri
entry['called_endpoint'] = endpoint_url
logger.log_struct(entry,severity="ERROR",)
print('Error logged')
def upload_insights_conversation(self, gcs_transcript_uri, metadata, gcs_audio_uri):
"""
Uploads a conversation to Contact Center AI Insights.
Args:
gcs_transcript_uri (str): The GCS URI of the transcript file.
metadata (dict): Metadata associated with the conversation.
gcs_audio_uri (str): The GCS URI of the audio file.
Returns:
str: The operation name for the upload request.
Raises:
Exception: If an error occurs during the upload.
"""
upload_conversation_url = (
'https://{}/{}/projects/{}/locations/{}/conversations:upload'
).format(self.insights_endpoint, self.insights_api_version, self.ccai_insights_project_id, self.ccai_insights_location_id)
headers = {
'charset': 'utf-8',
'Content-type': 'application/json',
'Authorization': 'Bearer {}'.format(self.oauth_token),
}
data = dict()
source = {'data_source': {'gcs_source': {'transcript_uri': gcs_transcript_uri, 'audio_uri': gcs_audio_uri} } }
data['conversation'] = metadata | source if metadata else source
data['conversation']['call_metadata'] = {'agent_channel':1, 'customer_channel': 2}
data['conversationId'] = gcs_audio_uri.split('/')[-1].replace('.flac', '')
r = requests.post(upload_conversation_url, headers=headers, json=data)
if r.status_code == requests.codes.ok:
print('Status ok')
operation = r.json()
operation_name = r.json()['name']
return operation_name
else:
print('Status not ok')
try:
r.raise_for_status()
except requests.exceptions.RequestException as e:
print(f'An error occurred running the CCAI Insights upload conversation operation: {e.response.text}')
raise Exception(f'{e.response.text}')
def upload(self, event):
"""
Cloud Function entry point for uploading conversations.
Processes a Cloud Storage event trigger to upload the corresponding conversation and transcript to CCAI Insights.
Args:
event (dict): The Cloud Storage event data.
"""
transcript_bucket_name = event.get("transcript_bucket")
transcript_file_name = event.get("transcript_filename")
event_bucket = event.get("event_bucket")
event_filename = event.get("event_filename")
print("Bucket name: {}".format(transcript_bucket_name))
print("File name: {}".format(transcript_file_name))
transcript_uri = self.get_gcs_uri(transcript_bucket_name, transcript_file_name)
audio_uri = self.get_gcs_uri("redacted-audio-files", event_filename)
metadata = self.get_audiofile_metadata(event_bucket, event_filename)
record_keeper = RecordKeeper(self.ingest_record_bucket_id, event.get('original_file_name'))
if not audio_uri:
print('No audio to ingest')
return
try:
operation_name = self.upload_insights_conversation(transcript_uri, metadata, audio_uri)
print('Created operation ID: {}'.format(operation_name))
record_keeper.replace_row(record_keeper.create_processed_record())
return operation_name
except Exception as e:
upload_conversation_url = ('https://{}/{}/projects/{}/locations/{}/conversations:upload').format(self.insights_endpoint, self.insights_api_version, self.ccai_insights_project_id, self.ccai_insights_location_id)
self.log_error(str(e), audio_uri, transcript_uri, upload_conversation_url)
record_keeper.replace_row(
record_keeper.create_error_record(f'An error ocurred during upload conversation: {str(e)}'))