infra-as-code/modules/export-to-bq-incremental/function-source-code/lib.py (205 lines of code) (raw):
# Copyright 2024 Google LLC
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import requests
import json
import google.auth
import google.oauth2.credentials
import google.auth.transport.requests
import time
import datetime
import pandas as pd
import db_dtypes
from google.cloud import bigquery
class InsightsHelper:
"""
A helper class for interacting with Contact Center AI Insights and BigQuery.
This class provides methods for:
- Retrieving an OAuth token for authentication.
- Getting the status of an operation.
- Submitting an export request to BigQuery.
- Executing a merge query in BigQuery.
- Exporting a BigQuery staging table to a Pandas DataFrame.
- Getting the latest update timestamp from BigQuery.
- Getting the conversation count from BigQuery and CCAI Insights.
Attributes:
ccai_insights_project_id (str): The project ID for CCAI Insights.
ccai_insights_location_id (str): The location ID for CCAI Insights.
bigquery_project_id (str): The project ID for BigQuery.
bigquery_staging_dataset (str): The dataset for the staging table in BigQuery.
bigquery_staging_table (str): The name of the staging table in BigQuery.
bigquery_final_dataset (str): The dataset for the final table in BigQuery.
bigquery_final_table (str): The name of the final table in BigQuery.
insights_endpoint (str): The endpoint for the CCAI Insights API.
insights_api_version (str): The version of the CCAI Insights API.
bq_client (google.cloud.bigquery.Client): The BigQuery client object.
staging_table_id (str): The fully qualified ID of the staging table in BigQuery.
final_table_id (str): The fully qualified ID of the final table in BigQuery.
insights_url_with_location (str): The base URL for the CCAI Insights API with the location included.
"""
ccai_insights_project_id: str
ccai_insights_location_id: str
bigquery_project_id: str
bigquery_staging_dataset: str
bigquery_staging_table: str
bigquery_final_dataset: str
bigquery_final_table: str
bq_client = None
INSIGHTS_BASE_URL = 'https://contactcenterinsights.googleapis.com/v1'
insights_url_with_location = None
def __init__(
self,
ccai_insights_project_id,
ccai_insights_location_id,
bigquery_project_id,
bigquery_staging_dataset,
bigquery_staging_table,
bigquery_final_dataset,
bigquery_final_table
):
self.ccai_insights_project_id = ccai_insights_project_id
self.ccai_insights_location_id = ccai_insights_location_id
self.bigquery_project_id = bigquery_project_id
self.bigquery_staging_dataset = bigquery_staging_dataset
self.bigquery_staging_table = bigquery_staging_table
self.bigquery_final_dataset = bigquery_final_dataset
self.bigquery_final_table = bigquery_final_table
self.bq_client = bigquery.Client()
self.staging_table_id = f'{self.bigquery_project_id}.{self.bigquery_staging_dataset}.{self.bigquery_staging_table}'
self.final_table_id = f'{self.bigquery_project_id}.{self.bigquery_final_dataset}.{self.bigquery_final_table}'
self.insights_url_with_location = ( self.INSIGHTS_BASE_URL
+ f'/projects/{self.ccai_insights_project_id}/locations/{self.ccai_insights_location_id}')
def get_token(self):
"""
Retrieves an OAuth token for authentication.
Returns:
str: The OAuth token.
"""
creds, _ = google.auth.default(
scopes=['https://www.googleapis.com/auth/cloud-platform'])
auth_req = google.auth.transport.requests.Request()
creds.refresh(auth_req)
return creds.token
def get_operation(self,operation_name):
"""
Gets the status of an operation.
Args:
operation_name (str): The name of the operation.
Returns:
dict: The operation details.
"""
headers = {
'charset': 'utf-8',
'Content-type': 'application/json',
'Authorization': f'Bearer {self.get_token()}'
}
url = (f'{self.INSIGHTS_BASE_URL}/{operation_name}')
response = requests.get(url, headers=headers)
response.raise_for_status()
return response.json()
def submit_export_request(self, filter):
"""
Submits an export request to BigQuery.
Args:
filter (str): The filter to apply to the export request.
Returns:
dict: The response from the export request.
"""
headers = {
'charset': 'utf-8',
'Content-type': 'application/json',
'Authorization': f'Bearer {self.get_token()}'
}
request_data = {
'parent': f'projects/{self.ccai_insights_project_id}/locations/{self.ccai_insights_location_id}',
'writeDisposition':'WRITE_TRUNCATE',
'bigQueryDestination':{
'projectId':self.bigquery_project_id,
'dataset':self.bigquery_staging_dataset,
'table':self.bigquery_staging_table
},
'filter':filter
}
print('BQ Export Request Data:')
print(request_data)
url = f'{self.insights_url_with_location}/insightsdata:export'
response = requests.post(url, headers=headers, json=request_data)
response.raise_for_status()
response_json = response.json()
return response_json
def execute_merge_query(self):
"""
Executes a merge query in BigQuery to update or insert data from the staging table to the final table.
"""
merge_query = f'''
MERGE `{self.final_table_id}` T
USING (
SELECT
conversationName,
audioFileUri,
dialogflowConversationProfileId,
startTimestampUtc,
loadTimestampUtc,
analysisTimestampUtc,
conversationUpdateTimestampUtc,
year,
month,
day,
durationNanos,
silenceNanos,
silencePercentage,
agentSpeakingPercentage,
clientSpeakingPercentage,
agentSentimentScore,
agentSentimentMagnitude,
clientSentimentScore,
clientSentimentMagnitude,
transcript,
turnCount,
languageCode,
medium,
issues,
entities,
labels,
words,
sentences,
latestSummary,
qaScorecardResults,
agents
FROM `{self.staging_table_id}`) S
ON T.conversationName = S.conversationName
WHEN MATCHED AND T.conversationUpdateTimestampUtc != S.conversationUpdateTimestampUtc THEN
UPDATE SET
audioFileUri = S.audioFileUri,
dialogflowConversationProfileId = S.dialogflowConversationProfileId,
startTimestampUtc = S.startTimestampUtc,
loadTimestampUtc = S.loadTimestampUtc,
analysisTimestampUtc = S.analysisTimestampUtc,
conversationUpdateTimestampUtc = S.conversationUpdateTimestampUtc,
year = S.year,
month = S.month,
day = S.day,
durationNanos = S.durationNanos,
silenceNanos = S.silenceNanos,
silencePercentage = S.silencePercentage,
agentSpeakingPercentage = S.agentSpeakingPercentage,
clientSpeakingPercentage = S.clientSpeakingPercentage,
agentSentimentScore = S.agentSentimentScore,
agentSentimentMagnitude = S.agentSentimentMagnitude,
clientSentimentScore = S.clientSentimentScore,
clientSentimentMagnitude = S.clientSentimentMagnitude,
transcript = S.transcript,
turnCount = S.turnCount,
languageCode = S.languageCode,
medium = S.medium,
issues = S.issues,
entities = S.entities,
labels = S.labels,
words = S.words,
sentences = S.sentences,
latestSummary = S.latestSummary,
qaScorecardResults = S.qaScorecardResults,
agents = S.agents
WHEN NOT MATCHED THEN
INSERT ROW
'''
print('Merge query to be executed:')
print(merge_query)
merge_job = self.bq_client.query(merge_query) # API request
merge_result = merge_job.result() # Waits for query to finish
print('Merge query result:')
print(f'job_id: {merge_job.job_id}')
print(f'num_dml_affected_rows: {merge_job.num_dml_affected_rows}')
def get_latest_update_time(self):
query = f'''
SELECT MAX(conversationUpdateTimestampUtc) as maxUpdateTimestamp FROM `{self.final_table_id}`
'''
bq_job = self.bq_client.query(query)
bq_job_result = bq_job.result()
maxUpdateTimestamp = None
for row in bq_job_result:
maxUpdateTimestamp = row['maxUpdateTimestamp']
if maxUpdateTimestamp is not None:
print(f'maxUpdateTimestamp found: `{maxUpdateTimestamp}`')
dt = datetime.datetime.fromtimestamp(maxUpdateTimestamp)
formatted_time = dt.strftime('%Y-%m-%dT%H:%M:%SZ')
print(f'maxUpdateTimestamp found (formatted): `{formatted_time}`')
return formatted_time
else:
print(f'maxUpdateTimestamp not found!')
return None
def get_conversation_count_bq(self):
query = f'''
SELECT count(*) as conversationCount FROM `{self.final_table_id}`
'''
bq_job = self.bq_client.query(query)
bq_job_result = bq_job.result()
conversationCount = None
for row in bq_job_result:
conversationCount = row['conversationCount']
if conversationCount is None:
raise Exception(f'There was a problem fetching the conversation count from `{self.final_table_id}`')
print(f'BQ conversationCount: `{conversationCount}`')
return conversationCount
def get_conversation_count_insights(self):
headers = {
'charset': 'utf-8',
'Content-type': 'application/json',
'Authorization': f'Bearer {self.get_token()}'
}
url = f'{self.insights_url_with_location}/conversations:calculateStats'
response = requests.get(url, headers=headers)
response.raise_for_status()
conversationCount = response.json()['conversationCount']
print(f'Insights conversationCount: `{conversationCount}`')
return response.json()['conversationCount']