backend-apis/app/routers/p5_contact_center_analyst.py (377 lines of code) (raw):
# Copyright 2023 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Persona 5 routers - Contact Center Analyst
"""
import json
import tomllib
from fastapi import APIRouter, HTTPException
from google.api_core.exceptions import GoogleAPICallError
from google.cloud import firestore
from google.cloud.firestore_v1.base_query import FieldFilter
from proto import Message
from app.models.p5_model import (
Customer,
GenerateConversationsInsightsRequest,
GenerateConversationsInsightsResponse,
GenerateReviewsInsightsRequest,
GenerateReviewsInsightsResponse,
SearchConversationsRequest,
SearchConversationsResponse,
SearchReviewsRequest,
SearchReviewsResponse,
VectorFindNeighborRequest,
VectorFindNeighborResponse,
)
from app.utils import (
utils_cloud_nlp,
utils_gemini,
utils_palm,
utils_search,
utils_vertex_vector,
)
# Load configuration file
with open("app/config.toml", "rb") as f:
config = tomllib.load(f)
db = firestore.Client()
router = APIRouter(prefix="/p5", tags=["P5 - Contact Center Analyst"])
# ---------------------------------GET----------------------------------------#
@router.get(path="/customer/{customer_id}")
def get_customer_info(customer_id: str) -> Customer:
"""
# Retrieve customer information from Firestore
- CDP data
- Conversations
- Reviews
## Request Parameters:
**customer_id**: *string*
- Unique identifier of the customer
## Customer:
**conversations**: *list*
- List of all the conversations that customer had with the Call Center
**reviews**: *list*
- List of all the reviews submited by that customer
**customer_info**: *dict*
- Information about that customer extracted from the CDP
"""
customer_info = (
db.collection(config["search-persona5"]["firestore_customers"])
.document(customer_id)
.get()
)
if customer_info:
customer_info = customer_info.to_dict() or {}
else:
raise HTTPException(status_code=400, detail="Customer ID not found.")
conversations = (
db.collection(config["search-persona5"]["firestore_conversations"])
.where(filter=FieldFilter("customer_id", "==", customer_id))
.get()
)
conversations_list = []
for conversation in conversations:
conversations_list.append(conversation.to_dict())
reviews = (
db.collection(config["search-persona5"]["firestore_reviews"])
.where(filter=FieldFilter("customer_id", "==", customer_id))
.get()
)
reviews_list = []
for review in reviews:
reviews_list.append(review.to_dict())
return Customer(
conversations=conversations_list,
reviews=reviews_list,
customer_info=customer_info,
)
# ---------------------------------POST---------------------------------------#
@router.post(path="/generate-conversations-insights")
def generate_insights_conversations(
data: GenerateConversationsInsightsRequest,
) -> GenerateConversationsInsightsResponse:
"""
# Generate insights from conversations.
- Summary
- Insights (what went good/not good)
- Pending tasks
- Next best action
## Request Body [GenerateConversationsInsightsRequest]:
**conversations**: *list*
- Conversations to generate the insights from
## Response Body [GenerateConversationsInsightsResponse]:
**summary**: *string*
- Summary of the conversations
**entities**: *list*
- Entities extracted with Cloud NL API
**insights**: *string*
- Insights from the conversations
**pending_tasks**: *string*
- Pending tasks from the conversations
**next_best_action**: *string*
- Next best action extracted from the conversations
"""
if len(data.conversations) > 1:
prompt_summary = config["search-persona5"][
"prompt_summary_multi_conversations"
]
prompt_insights = config["search-persona5"][
"prompt_insights_multi_conversations"
]
prompt_tasks = config["search-persona5"][
"prompt_pending_tasks_multi_conversations"
]
prompt_nbs = config["search-persona5"][
"prompt_nbs_multi_conversations"
]
else:
prompt_summary = config["search-persona5"][
"prompt_summary_conversation"
]
prompt_insights = config["search-persona5"][
"prompt_insights_conversation"
]
prompt_tasks = config["search-persona5"][
"prompt_pending_tasks_conversation"
]
prompt_nbs = config["search-persona5"]["prompt_nbs_conversation"]
input_text = json.dumps({"conversations": data.conversations})
try:
summary = utils_gemini.generate_gemini_pro_text(
prompt=prompt_summary.format(input_text)
)
insights = utils_gemini.generate_gemini_pro_text(
prompt=prompt_insights.format(input_text)
)
pending_tasks = utils_gemini.generate_gemini_pro_text(
prompt=prompt_tasks.format(input_text)
)
next_best_action = utils_gemini.generate_gemini_pro_text(
prompt=prompt_nbs.format(input_text)
)
except GoogleAPICallError as e:
raise HTTPException(
status_code=400,
detail=f"Error calling Vertex AI PaLM API. " f"{str(e)}",
) from e
try:
entities = utils_cloud_nlp.nlp_analyze_entities(input_text)
except GoogleAPICallError as e:
raise HTTPException(
status_code=400,
detail=f"Error calling Google Cloud NL API. " f"{str(e)}",
) from e
return GenerateConversationsInsightsResponse(
summary=summary,
entities=entities,
insights=insights,
pending_tasks=pending_tasks,
next_best_action=next_best_action,
)
@router.post(path="/generate-reviews-insights")
def generate_insights_reviews(
data: GenerateReviewsInsightsRequest,
) -> GenerateReviewsInsightsResponse:
"""
# Generate insights from reviews.
- Summary
- Insights (what went good/not good)
- Pending tasks
- Next best action
- Entities
## Request Body [GenerateReviewsInsightsRequest]:
**reviews**: *list*
- Reviews to generate the insights from
## Response Body [GenerateReviewsInsightsResponse]:
**summary**: *string*
- Summary of the reviews
**entities**: *list*
- Entities extracted with Cloud NL API
**insights**: *string*
- Insights from the reviews
**pending_tasks**: *string*
- Pending tasks from the reviews
**next_best_action**: *string*
- Next best action extracted from the reviews
"""
prompt_summary = config["search-persona5"]["prompt_summary_reviews"]
prompt_insights = config["search-persona5"]["prompt_insights_reviews"]
prompt_tasks = config["search-persona5"]["prompt_pending_tasks_reviews"]
prompt_nbs = config["search-persona5"]["prompt_nbs_reviews"]
input_text = json.dumps({"reviews": data.reviews})
try:
summary = utils_gemini.generate_gemini_pro_text(
prompt=prompt_summary.format(input_text)
)
insights = utils_gemini.generate_gemini_pro_text(
prompt=prompt_insights.format(input_text)
)
pending_tasks = utils_gemini.generate_gemini_pro_text(
prompt=prompt_tasks.format(input_text)
)
next_best_action = utils_gemini.generate_gemini_pro_text(
prompt=prompt_nbs.format(input_text)
)
except GoogleAPICallError as e:
raise HTTPException(
status_code=400,
detail=f"Error calling Vertex AI Gemini API. " f"{str(e)}",
) from e
try:
entities = utils_cloud_nlp.nlp_analyze_entities(input_text)
except GoogleAPICallError as e:
raise HTTPException(
status_code=400,
detail=f"Error calling Google Cloud NL API. " f"{str(e)}",
) from e
return GenerateReviewsInsightsResponse(
summary=summary,
entities=entities,
insights=insights,
pending_tasks=pending_tasks,
next_best_action=next_best_action,
)
@router.post(path="/search-conversations")
def search_conversations(
data: SearchConversationsRequest,
) -> SearchConversationsResponse:
"""
# Search for conversations on Vertex AI Search Datastore
## Request Body [SearchConversationsRequest]:
**query**: *string*
- User input to search the datastore
**user_pseudo_id**: *string*
- User unique ID
**conversation_id**: *string*
- ID of the Vertex AI Search Conversation
**rating**: *list*
- Filter field for conversation rating
- Allowed values
- 1
- 2
- 3
- 4
- 5
**status**: *list*
- Filter field for conversation status
- Allowed values
- resolved
- not resolved
**sentiment**: *list*
- Filter field for conversation sentiment
- Allowed values
- positive
- negative
- neutral
**category**: *list*
- Filter field for conversation category
- Allowed values
- Bath Robe
- Bath Towel Set
- Bed
- Bookcase
- Chair
- Console Table
- Dining Table
- Game Table
- Grill
- Office Chair
- Ottoman
- Outdoor Heater
- Pool
- Sofa
- Tool Cabinet
**agent_id**: *string*
- Filter field for conversation agent_id
**customer_id**: *string*
- Filter field for conversation customer_id
**product_id**: *string*
- Filter field for conversation product_id
## Response Body [SearchConversationsResponse]:
**responses**: *dictionary*
- Search results, including information about the conversation
**conversation_id**: *string*
- ID of the Vertex AI Search Conversation
"""
if not data.conversation_id:
try:
conversation_id = utils_search.create_new_conversation(
user_pseudo_id=data.user_pseudo_id,
datastore_id=config["search-persona5"][
"conversations_datastore_id"
],
).name
except GoogleAPICallError as e:
raise HTTPException(
status_code=400,
detail=f"Error creating a Vertex AI Conversation session. "
f"{str(e)}",
) from e
else:
conversation_id = data.conversation_id
search_filter = ""
if data.rating:
search_filter += 'rating: ANY("'
search_filter += '","'.join(data.rating)
search_filter += '") '
if data.status:
if search_filter:
search_filter += " AND "
search_filter += 'status: ANY("'
search_filter += '","'.join(data.status)
search_filter += '") '
if data.sentiment:
if search_filter:
search_filter += " AND "
search_filter += 'sentiment: ANY("'
search_filter += '","'.join(data.sentiment)
search_filter += '") '
if data.category:
if search_filter:
search_filter += " AND "
search_filter += 'category: ANY("'
search_filter += '","'.join(data.category)
search_filter += '") '
if data.agent_id:
if search_filter:
search_filter += " AND "
search_filter += f'agent_id: ANY("{data.agent_id}") '
if data.customer_id:
if search_filter:
search_filter += " AND "
search_filter += f'customer_id: ANY("{data.customer_id}") '
if data.product_id:
if search_filter:
search_filter += " AND "
search_filter += f'product_id: ANY("{data.product_id}") '
try:
results = utils_search.vertexai_search_multiturn(
search_query=data.query,
conversation_id=conversation_id,
# search_filter=search_filter, # Uncomment when available
datastore_id=config["search-persona5"][
"conversations_datastore_id"
],
)
results = Message.to_dict(results)
except GoogleAPICallError as e:
raise HTTPException(
status_code=400,
detail=f"Error searching Vertex AI datatore. " f"{str(e)}",
) from e
responses = {}
reply = results.get("reply", "")
summary = reply.get("summary", "")
summary_text = summary.get("summary_text", "")
responses["summary"] = summary_text
responses["user_input"] = data.query
responses["search_results"] = []
search_results = results.get("search_results", "")
for i in search_results:
document = i.get("document", "")
struct_data = document.get("struct_data", "")
responses["search_results"].append(
{
"id": i["id"],
"sentiment": struct_data["sentiment"],
"agent_id": struct_data["agent_id"],
"customer_id": struct_data["customer_id"],
"customer_email": struct_data["customer_email"],
"product_id": struct_data["product_id"],
"category": struct_data["category"],
"rating": struct_data["rating"],
"title": struct_data["title"],
"agent_email": struct_data["agent_email"],
"status": struct_data["status"],
"conversation": struct_data["conversation"],
}
)
return SearchConversationsResponse(
responses=responses, conversation_id=conversation_id
)
@router.post(path="/search-reviews")
def search_reviews(data: SearchReviewsRequest) -> SearchReviewsResponse:
"""
# Search for reviews on Vertex AI Search Datastore
## Request Body [SearchReviewsRequest]:
**query**: *string*
- User input to search the datastore
**user_pseudo_id**: *string*
- User unique ID
**conversation_id**: *string*
- ID of the Vertex AI Search Conversation
**rating**: *list*
- Filter field for conversation rating
- Allowed values
- 1
- 2
- 3
- 4
- 5
**sentiment**: *list*
- Filter field for conversation sentiment
- Allowed values
- positive
- negative
- neutral
**category**: *list*
- Filter field for conversation category
- Allowed values
- Bath Robe
- Bath Towel Set
- Bed
- Bookcase
- Chair
- Console Table
- Dining Table
- Game Table
- Grill
- Office Chair
- Ottoman
- Outdoor Heater
- Pool
- Sofa
- Tool Cabinet
**customer_id**: *string*
- Filter field for review customer_id
**product_id**: *string*
- Filter field for review product_id
## Response Body [SearchReviewsResponse]:
**responses**: *dictionary*
- Search results, including information about the review
**conversation_id**: *string*
- ID of the Vertex AI Search Conversation
"""
if not data.conversation_id:
try:
conversation_id = utils_search.create_new_conversation(
user_pseudo_id=data.user_pseudo_id,
datastore_id=config["search-persona5"]["reviews_datastore_id"],
).name
except GoogleAPICallError as e:
raise HTTPException(
status_code=400,
detail=f"Error creating a Vertex AI Conversation session. "
f"{str(e)}",
) from e
else:
conversation_id = data.conversation_id
search_filter = ""
if data.customer_id:
search_filter += f'customer_id: ANY("{data.customer_id}") '
if data.product_id:
search_filter += f'product_id: ANY("{data.product_id}") '
if data.rating:
search_filter += 'rating: ANY("'
search_filter += '","'.join(data.rating)
search_filter += '") '
if data.sentiment:
search_filter += 'sentiment: ANY("'
search_filter += '","'.join(data.sentiment)
search_filter += '") '
if data.category:
search_filter += 'category: ANY("'
search_filter += '","'.join(data.category)
search_filter += '") '
try:
results = utils_search.vertexai_search_multiturn(
search_query=data.query,
conversation_id=conversation_id,
# search_filter=search_filter, # Uncomment when available
datastore_id=config["search-persona5"]["reviews_datastore_id"],
)
results = Message.to_dict(results)
except GoogleAPICallError as e:
raise HTTPException(
status_code=400,
detail=f"Error searching Vertex AI datatore. " f"{str(e)}",
) from e
responses = {}
reply = results.get("reply", "")
summary = reply.get("summary", "")
summary_text = summary.get("summary_text", "")
responses["summary"] = summary_text
responses["user_input"] = data.query
responses["search_results"] = []
search_results = results.get("search_results", "")
for i in search_results:
document = i.get("document", "")
struct_data = document.get("struct_data", "")
responses["search_results"].append(
{
"id": i["id"],
"sentiment": struct_data["sentiment"],
"customer_id": struct_data["customer_id"],
"customer_email": struct_data["customer_email"],
"product_id": struct_data["product_id"],
"category": struct_data["category"],
"rating": struct_data["rating"],
"title": struct_data["title"],
"review": struct_data["review"],
}
)
return SearchReviewsResponse(
responses=responses, conversation_id=conversation_id
)
@router.post(path="/vector-find-similar")
def vector_find_similar(
data: VectorFindNeighborRequest,
) -> VectorFindNeighborResponse:
"""
# Find Similar documents using embeddings and Vector Search
## Request Body [VectorFindNeighborRequest]:
**input_text**: *string*
- Input text that will be used as the reference to search
similar documents.
**user_journey**: *string*
- Identifier for conversations or reviews
- Allowed values:
- conversations
- reviews
## Response Body [VectorFindNeighborResponse]:
**similar_vectors**: *list*
- List of similar documents and their metadata
"""
try:
embeddings = utils_palm.get_text_embeddings(data.input_text)
except GoogleAPICallError as e:
raise HTTPException(
status_code=400,
detail=f"Error extracting embeddings from input text. "
f"{str(e)}",
) from e
try:
similar_vectors = utils_vertex_vector.find_neighbor(
feature_vector=embeddings,
persona=5,
user_journey=data.user_journey,
)
similar_vectors = Message.to_dict(similar_vectors)
except GoogleAPICallError as e:
raise HTTPException(
status_code=400,
detail=f"Error calling Vertex AI Vector Search. " f"{str(e)}",
) from e
nearest_neighbors = similar_vectors.get("nearest_neighbors", "")
neighbors = nearest_neighbors[0].get("neighbors")
results = []
firebase_collection = ""
if data.user_journey == "conversations":
firebase_collection = "p5-conversations"
else:
firebase_collection = "p5-reviews"
for i in neighbors:
document = (
db.collection(firebase_collection)
.document(i["datapoint"]["datapoint_id"])
.get()
.to_dict()
)
results.append(document)
return VectorFindNeighborResponse(similar_vectors=results)