backend-apis/app/routers/p4_customer_service_agent.py (340 lines of code) (raw):
# Copyright 2023 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Persona 4 routers - Customer Service Agent
"""
import asyncio
import json
import tomllib
from datetime import datetime, timedelta, timezone
from fastapi import APIRouter, HTTPException
from fastapi.responses import JSONResponse
from google.api_core.exceptions import GoogleAPICallError
from google.cloud import firestore
from google.protobuf import timestamp_pb2
from proto import Message
from app.models.p4_model import (
AddMessageResponse,
ChatMessage,
ConversationSummaryAndTitleResponse,
RephraseTextRequest,
RephraseTextResponse,
ScheduleEventRequest,
ScheduleEventResponse,
SearchConversationsRequest,
SearchConversationsResponse,
SearchManualsRequest,
SearchManualsResponse,
TranslateRequest,
TranslateResponse,
AutoSuggestRequest,
AutoSuggestResponse,
)
from app.utils import (
utils_cloud_translation,
utils_gemini,
utils_search,
utils_workspace,
)
# Imports the Google Cloud language client library
from google.cloud import language_v1
# Instantiates a client
lang_client = language_v1.LanguageServiceClient()
# Load configuration file
with open("app/config.toml", "rb") as f:
config = tomllib.load(f)
# Global configuration
project_id = config["global"]["project_id"]
# Prompts for Salesforce
sf_summarize_prompt_template = config["salesforce"][
"sf_summarize_prompt_template"
]
chat_summarize_prompt_template = config["salesforce"][
"chat_summarize_prompt_template"
]
chat_title_prompt_template = config["salesforce"]["chat_title_prompt_template"]
rephrase_prompt_template = config["salesforce"]["rephrase_prompt_template"]
auto_suggest_prompt_template = config["salesforce"]["auto_suggest_prompt_template"]
router = APIRouter(prefix="/p4", tags=["P4 - Customer Service Agent"])
db = firestore.Client()
# ---------------------------------GET---------------------------------------#
@router.get(path="/conversation_summary_and_title/{user_id}/{conversation_id}")
def conversation_summary_and_title(
user_id: str, conversation_id: str
) -> ConversationSummaryAndTitleResponse:
"""
# End conversation and create summary
## Request parameters
**user_id**: *string*
- User Id
**conversation_id**: *string*
- Conversation Id
## Response body [ConversationSummaryAndTitleResponse]
**summary**: *string*
- Conversation Summary
**title** *string*
- Conversation Title
## Raises
**HTTPException** - *500* - Error
- Query conversation error
**HTTPException** - *404* - Error
- No conversation found
**HTTPException** - *500* - Error
- Summarization error
"""
try:
conversation_messages_snapshot = [
message_snapshot.to_dict()
for message_snapshot in db.collection("p4-conversations")
.document(user_id)
.collection("conversations")
.document(conversation_id)
.collection("messages")
.get()
]
except GoogleAPICallError as e:
print(f"[Error]query_conversation:{e}")
raise HTTPException(status_code=500, detail=str(e)) from e
summary = "Empty conversation."
title = "Empty conversation."
if conversation_messages_snapshot:
try:
conversation_str = json.dumps(
{"messages": conversation_messages_snapshot},
default=str,
)
summary, title = asyncio.run(
utils_gemini.run_predict_text_llm(
prompts=[
chat_summarize_prompt_template.format(
conversation_str
),
chat_title_prompt_template.format(conversation_str),
]
)
)
if not summary:
summary = "Closed case"
if not title:
title = "Closed case"
except GoogleAPICallError as e:
print(f"[Error]VertexSummarizeChat:{e}")
raise HTTPException(status_code=500, detail=str(e)) from e
try:
db.collection("p4-conversations").document(user_id).collection(
"conversations"
).document(conversation_id).update(
{"title": title, "summary": summary}
)
except GoogleAPICallError as e:
print(e)
raise HTTPException(status_code=500, detail=str(e)) from e
return ConversationSummaryAndTitleResponse(summary=summary, title=title)
# ---------------------------------POST---------------------------------------#
@router.post(path="/message/{user_id}/{conversation_id}")
def add_message(
user_id: str, conversation_id: str, message: ChatMessage
) -> AddMessageResponse:
"""
# Add message to conversation
## Path parameters
**user_id**: *string*
- Conversation Id
**conversation_id**: *string*
- Conversation Id
- If "new", creates a new conversation id
## ChatMessage
**text**: *string*
- Text to send
**author**: *string*
- Author
- Allowed values
- User
- Agent
- System
**language**: *string*
- Language
## Response body for send-message
**conversation_id**: *string*
- Conversation Id
"""
if conversation_id == "new":
try:
conversation_doc: tuple[
timestamp_pb2.Timestamp, firestore.DocumentReference
] = (
db.collection("p4-conversations")
.document(user_id)
.collection("conversations")
.add({"timestamp": datetime.now(tz=timezone.utc)})
)
except GoogleAPICallError as e:
raise HTTPException(
status_code=400, detail="Error setting in Firestore" + str(e)
) from e
conversation_id = conversation_doc[1].id
try:
document = language_v1.types.Document(
content=message.text, type_=language_v1.types.Document.Type.PLAIN_TEXT
)
# Detects the sentiment of the text
sentiment = lang_client.analyze_sentiment(
request={"document": document}
).document_sentiment
sentiment_score = sentiment.score
sentiment_magnitude = sentiment.magnitude
except:
sentiment_score = 0
sentiment_magnitude = 0
db.collection("p4-conversations").document(user_id).collection(
"conversations"
).document(conversation_id).collection("messages").add(
{
"author": message.author,
"text": message.text,
"timestamp": datetime.now(tz=timezone.utc),
"language": message.language,
"link": message.link,
"iconURL": message.iconURL,
"sentiment_score":sentiment_score,
"sentiment_magnitude": sentiment_magnitude
}
)
return AddMessageResponse(conversation_id=conversation_id)
# ---------------------------------DELETE---------------------------------------#
@router.delete(path="/clear_conversations/{user_id}")
def clear_all_conversations(
user_id: str
):
"""
# Add message to conversation
## Path parameters
**user_id**: *string*
"""
conv_ref = db.collection("p4-conversations").document(user_id).collection(
"conversations"
)
conv_docs = conv_ref.list_documents()
for doc in conv_docs:
msg_docs = doc.collection("messages").list_documents()
for msg in msg_docs:
msg.delete()
print("Deleting Conversation)")
doc.delete()
return JSONResponse(
content={'message': 'Successfully Deleted Conversations'},
status_code=200
)
@router.post(path="/auto-suggest-query")
def auto_suggest_query_text(
data: AutoSuggestRequest,
) -> AutoSuggestResponse:
"""
# Form Query for a given text.
## Request body for given-text
**input_text**: *string*
- Conversation Text
## Response body for query-text
**output_text**: *string*
- Query text
"""
try:
llm_response = utils_gemini.generate_gemini_pro_text(
prompt=auto_suggest_prompt_template.format(data.input_text)
)
except:
llm_response = "No suggestions for now"
return AutoSuggestResponse(output_text=llm_response)
@router.post(path="/rephrase-text")
def rephrase_text(
data: RephraseTextRequest,
) -> RephraseTextResponse:
"""
# Rephrase a given text.
## Request body for rephrase-text
**rephrase_text_input**: *string*
- Text to rephrase
## Response body for rephrase-text
**rephrase_text_output**: *string*
- Rephrased text
"""
llm_response = utils_gemini.generate_gemini_pro_text(
prompt=rephrase_prompt_template.format(data.rephrase_text_input)
)
return RephraseTextResponse(rephrase_text_output=llm_response)
@router.post(path="/schedule-event")
def schedule_event(data: ScheduleEventRequest) -> ScheduleEventResponse:
"""
# Creates an event using Calendar API with Google Meet
## Request body for schedule-event
**event_summary**: *string*
- Event summary
**attendees**: *list*
- List of attendees
**start_time**: *string*
- Start time
**end_time**: *string*
- End time
## Response body for schedule-event
**conference_call_link**: *string*
- Conference call link
**icon_url**: *string*
- Icon URL
**start_time_iso**: *string*
- Start time ISO
**end_time_iso**: *string*
- End time ISO
## Raises
**HTTPException** - *400* - Error
- Calendar Event Creation Failed
"""
try:
start_date = datetime.fromisoformat(data.start_time).isoformat()
end_date = (
datetime.fromisoformat(data.start_time) + timedelta(minutes=30)
).isoformat()
result_dict = utils_workspace.create_calendar_event(
event_summary=data.event_summary,
attendees=data.attendees,
start_date=start_date,
end_date=end_date,
)
except Exception as e:
print(f"ERROR : Calendar Event Creation Failed : {e}")
raise HTTPException(status_code=400, detail=str(e)) from e
return ScheduleEventResponse(
conference_call_link=result_dict["hangoutLink"],
icon_url=result_dict["conferenceData"]["conferenceSolution"][
"iconUri"
],
start_time_iso=result_dict["start"]["dateTime"],
end_time_iso=result_dict["end"]["dateTime"],
)
@router.post(path="/search-conversations")
def search_conversations(
data: SearchConversationsRequest,
) -> SearchConversationsResponse:
"""
# Search for conversations on Vertex AI Search Datastore
## Request Body [SearchConversationsRequest]:
**query**: *string*
- User input to search the datastore
**user_pseudo_id**: *string*
- User unique ID
**rating**: *list*
- Filter field for conversation rating
- Allowed values
- 1
- 2
- 3
- 4
- 5
**status**: *list*
- Filter field for conversation status
- Allowed values
- resolved
- not resolved
**sentiment**: *list*
- Filter field for conversation sentiment
- Allowed values
- positive
- negative
- neutral
**category**: *list*
- Filter field for conversation category
- Allowed values
- Bath Robe
- Bath Towel Set
- Bed
- Bookcase
- Chair
- Console Table
- Dining Table
- Game Table
- Grill
- Office Chair
- Ottoman
- Outdoor Heater
- Pool
- Sofa
- Tool Cabinet
**agent_id**: *string*
- Filter field for conversation agent_id
**customer_id**: *string*
- Filter field for conversation customer_id
**product_id**: *string*
- Filter field for conversation product_id
## Response Body [SearchConversationsResponse]:
**responses**: *dictionary*
- Search results, including information about the conversation
"""
search_filter = ""
if data.rating:
search_filter += 'rating: ANY("'
search_filter += '","'.join(data.rating)
search_filter += '") '
if data.status:
if search_filter:
search_filter += " AND "
search_filter += 'status: ANY("'
search_filter += '","'.join(data.status)
search_filter += '") '
if data.sentiment:
if search_filter:
search_filter += " AND "
search_filter += 'sentiment: ANY("'
search_filter += '","'.join(data.sentiment)
search_filter += '") '
if data.category:
if search_filter:
search_filter += " AND "
search_filter += 'category: ANY("'
search_filter += '","'.join(data.category)
search_filter += '") '
if data.agent_id:
if search_filter:
search_filter += " AND "
search_filter += f'agent_id: ANY("{data.agent_id}") '
if data.customer_id:
if search_filter:
search_filter += " AND "
search_filter += f'customer_id: ANY("{data.customer_id}") '
if data.product_id:
if search_filter:
search_filter += " AND "
search_filter += f'product_id: ANY("{data.product_id}") '
try:
search_response = utils_search.vertexai_search_oneturn(
search_query=data.query,
summary_result_count=5,
search_filter=search_filter,
datastore_id=config["search-persona5"][
"conversations_datastore_id"
],
)
except GoogleAPICallError as e:
raise HTTPException(
status_code=400,
detail=f"Error searching Vertex AI datatore. " f"{str(e)}",
) from e
responses = {}
responses["summary"] = search_response.summary.summary_text
responses["user_input"] = data.query
responses["search_results"] = []
for result in search_response.results:
search_result_dict = Message.to_dict(result)
document = search_result_dict.get("document", {})
derived_struct_data = document.get("derived_struct_data", {})
if len(derived_struct_data.get("snippets", [])) > 0:
struct_data = document.get("struct_data", {})
responses["search_results"].append(
{
"snippet": derived_struct_data["snippets"][0]["snippet"],
"link": derived_struct_data["link"],
"id": search_result_dict.get("id"),
"title": struct_data["title"],
"status": struct_data["status"],
"sentiment": struct_data["sentiment"],
"rating": struct_data["rating"],
"product_id": struct_data["product_id"],
"customer_id": struct_data["customer_id"],
"customer_email": struct_data["customer_email"],
"conversation": struct_data["conversation"],
"category": struct_data["category"],
"agent_id": struct_data["agent_id"],
"agent_email": struct_data["agent_email"],
}
)
return SearchConversationsResponse(responses=responses)
@router.post(path="/search-manuals")
def search_manuals(
data: SearchManualsRequest,
) -> SearchManualsResponse:
"""
# Search for conversations on Vertex AI Search Datastore
## Request Body [SearchConversationsRequest]:
**query**: *string*
- User input to search the datastore
**user_pseudo_id**: *string*
- User unique ID
**category**: *list*
- Filter field for manuals category
- Allowed values
- Bath Robe
- Bath Towel Set
- Bed
- Bookcase
- Chair
- Console Table
- Dining Table
- Game Table
- Grill
- Office Chair
- Ottoman
- Outdoor Heater
- Pool
- Sofa
- Tool Cabinet
## Response Body [SearchConversationsResponse]:
**responses**: *dictionary*
- Search results, including information about the conversation
"""
search_filter = ""
if data.category:
search_filter += 'category: ANY("'
search_filter += '","'.join(data.category)
search_filter += '") '
try:
search_response = utils_search.vertexai_search_oneturn(
search_query=data.query,
summary_result_count=5,
search_filter=search_filter,
datastore_id=config["search-persona5"][
"product_manuals_datastore_id"
],
)
except GoogleAPICallError as e:
raise HTTPException(
status_code=400,
detail=f"Error searching Vertex AI datatore. " f"{str(e)}",
) from e
responses = {}
responses["summary"] = search_response.summary.summary_text
responses["user_input"] = data.query
responses["search_results"] = []
for result in search_response.results:
search_result_dict = Message.to_dict(result)
document = search_result_dict.get("document", {})
derived_struct_data = document.get("derived_struct_data", {})
if len(derived_struct_data.get("snippets", [])) > 0:
struct_data = document.get("struct_data", {})
responses["search_results"].append(
{
"id": search_result_dict.get("id"),
"snippet": derived_struct_data["snippets"][0]["snippet"],
"link": derived_struct_data["link"],
"title": struct_data["title"],
"category": struct_data["category"],
"manual": struct_data["manual"],
}
)
return SearchManualsResponse(responses=responses)
@router.post(path="/translate")
def translate(data: TranslateRequest) -> TranslateResponse:
"""
# Translate text using Cloud Translation API
## Request body for translate
**input_text**: *string*
- Text to translate
**target_language**: *string*
- Target language
## Response body for translate
**output_text**: *string*
- Translated text
"""
translated_text = utils_cloud_translation.translate_text_cloud_api(
input_text=data.input_text, target_language=data.target_language
)
return TranslateResponse(output_text=translated_text)