backend-apis/app/routers/p6_field_service_agent.py (328 lines of code) (raw):

# Copyright 2023 Google LLC # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # https://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. """ Persona 6 routers - Field Service Agent """ import json import tomllib from datetime import datetime, timedelta # , timezone from fastapi import APIRouter, HTTPException from google.api_core.datetime_helpers import DatetimeWithNanoseconds from google.api_core.exceptions import GoogleAPICallError from google.cloud import firestore_v1 as firestore from google.cloud.exceptions import NotFound from google.cloud.firestore_v1.base_query import FieldFilter from google.protobuf import timestamp_pb2 from proto import Message from vertexai.preview.generative_models import Image from app.models.p6_model import ( AgentActivity, AskImageRequest, AskImageResponse, Customer, GenerateAgentActivityRequest, GenerateConversationsInsightsRequest, GenerateConversationsInsightsResponse, ScheduleEventRequest, ScheduleEventResponse, SearchManualsRequest, SearchManualsResponse, ) from app.utils import ( utils_cloud_nlp, utils_gemini, utils_imagen, utils_gemini, utils_search, utils_workspace, ) # Load configuration file with open("app/config.toml", "rb") as f: config = tomllib.load(f) firestore_client = firestore.Client() router = APIRouter(prefix="/p6", tags=["P6 - Field Service Agent"]) # ---------------------------------DELETE-------------------------------------# @router.delete(path="/agent-activity/{user_id}/{activity_id}") def delete_agent_activity(user_id: str, activity_id: str) -> str: """ # Delete user activity ## Path parameters **user_id**: *string* - User id **activity_id**: *string* - Activity id ## Returns - ok ## Raises **HTTPException** - *400* - Error deleting in Firestore - Firestore could not delete the activity """ try: firestore_client.collection("field-agent").document( user_id ).collection("activities").document(activity_id).delete() except GoogleAPICallError as e: raise HTTPException( status_code=400, detail="Error deleting in Firestore" + str(e) ) from e return "ok" # ---------------------------------GET----------------------------------------# @router.get(path="/customer/{customer_id}") def get_customer_info(customer_id: str) -> Customer: """ # Retrieve customer information from Firestore - CDP data - Conversations - Reviews ## Request Parameters: **customer_id**: *string* - Unique identifier of the customer ## Customer: **conversations**: *list* - List of all the conversations that customer had with the Call Center **reviews**: *list* - List of all the reviews submited by that customer **customer_info**: *dict* - Information about that customer extracted from the CDP """ customer_info_snapshot = ( firestore_client.collection( config["search-persona5"]["firestore_customers"] ) .document(customer_id) .get() ) if customer_info_snapshot: customer_info = customer_info_snapshot.to_dict() or {} else: raise HTTPException(status_code=400, detail="Customer ID not found.") conversations = ( firestore_client.collection( config["search-persona5"]["firestore_conversations"] ) .where(filter=FieldFilter("customer_id", "==", customer_id)) .get() ) conversations_list = [] for conversation in conversations: conversations_list.append(conversation.to_dict()) reviews = ( firestore_client.collection( config["search-persona5"]["firestore_reviews"] ) .where(filter=FieldFilter("customer_id", "==", customer_id)) .get() ) reviews_list = [] for review in reviews: reviews_list.append(review.to_dict()) return Customer( conversations=conversations_list, reviews=reviews_list, customer_info=customer_info, ) # ---------------------------------POST---------------------------------------# @router.post(path="/agent-activity/{user_id}") def add_agent_activity(user_id: str, activity: AgentActivity) -> str: """ # add agent activity ## path parameters **user_id**: *string* - user id ## agentactivity **title**: *string* - title of the activity **description**: *string* - description of the activity **customer_id**: *string* - customer id ## returns - ok ## raises **httpexception** - *400* - error setting in firestore - firestore could not set the activity """ try: firestore_client.collection("field-agent").document( user_id ).collection("activities").document().set( { **activity.model_dump(), "timestamp": DatetimeWithNanoseconds.from_timestamp_pb( timestamp_pb2.Timestamp( seconds=activity.timestamp["seconds"], nanos=activity.timestamp["nanoseconds"], ) ), } ) except GoogleAPICallError as e: raise HTTPException( status_code=400, detail="Error setting in Firestore" + str(e) ) from e return "ok" @router.post(path="/ask-image-gemini") def ask_image_gemini(data: AskImageRequest) -> AskImageResponse: """ # Multimodal generate with text and image ## Request body [AskImageRequest] **image_name**: *string* - Image name to be collected from Google Cloud Storage /images **user_query**: *string* - User query ## Response body [AskImageResponse] **response**: *str* - Generated text ## Raises **HTTPException** - *400* - Image name not provided **HTTPException** - *400* - Image not found in Cloud Storage **HTTPException** - *400* - Error generating response from Gemini """ if not data.image_name: raise HTTPException( status_code=400, detail="Provide at least one image to generate the categories.", ) try: base_image = Image.from_bytes( data=utils_imagen.image_name_to_bytes(image_name=data.image_name) ) except NotFound as e: raise HTTPException( status_code=404, detail="Image not found in Cloud Storage " + str(e), ) from e try: gemini_response = utils_gemini.generate_gemini_pro_vision( [data.user_query, base_image] ) response = gemini_response.candidates[0].content.parts[0].text except GoogleAPICallError as e: raise HTTPException( status_code=400, detail="Error generating response from Gemini " + str(e), ) from e return AskImageResponse(response=response) @router.post(path="/generate-agent-activity") def generate_agent_activity(data: GenerateAgentActivityRequest) -> str: """ # Generate agent activity ## Request body [GenerateActivityRequest] **user_id**: *string* - User id **customer_id**: *string* - Customer id **conversation**: *string* - Chat conversation with virtual agent **date_time**: *string* - Date time for the activity ## Returns - ok """ try: response_palm = utils_gemini.generate_gemini_pro_text( prompt=config["field_service_agent"][ "prompt_agent_activity" ].format(data.conversation) ).replace("</output>", "") response_palm = response_palm.replace("```json", "").replace("```", "") print(f"=>{response_palm}") response = json.loads(response_palm) except Exception as e: raise HTTPException( status_code=400, detail="Error generating title / description with PaLM" + str(e), ) from e agent_activity = AgentActivity( title=response["title"], description=response["description"], customer_id=data.customer_id, timestamp=data.timestamp, ) return add_agent_activity(data.user_id, agent_activity) @router.post(path="/generate-conversations-insights") def generate_insights_conversations( data: GenerateConversationsInsightsRequest, ) -> GenerateConversationsInsightsResponse: """ # Generate insights from conversations. - Summary - Insights (what went good/not good) - Pending tasks - Next best action ## Request Body [GenerateConversationsInsightsRequest]: **conversations**: *list* - Conversations to generate the insights from ## Response Body [GenerateConversationsInsightsResponse]: **summary**: *string* - Summary of the conversations **entities**: *list* - Entities extracted with Cloud NL API **insights**: *string* - Insights from the conversations **pending_tasks**: *string* - Pending tasks from the conversations **next_best_action**: *string* - Next best action extracted from the conversations """ if len(data.conversations) > 1: prompt_summary = config["search-persona5"][ "prompt_summary_multi_conversations" ] prompt_insights = config["search-persona5"][ "prompt_insights_multi_conversations" ] prompt_tasks = config["search-persona5"][ "prompt_pending_tasks_multi_conversations" ] prompt_nbs = config["search-persona5"][ "prompt_nbs_multi_conversations" ] else: prompt_summary = config["search-persona5"][ "prompt_summary_conversation" ] prompt_insights = config["search-persona5"][ "prompt_insights_conversation" ] prompt_tasks = config["search-persona5"][ "prompt_pending_tasks_conversation" ] prompt_nbs = config["search-persona5"]["prompt_nbs_conversation"] input_text = json.dumps({"conversations": data.conversations}) try: summary = utils_gemini.generate_gemini_pro_text( prompt=prompt_summary.format(input_text) ) insights = utils_gemini.generate_gemini_pro_text( prompt=prompt_insights.format(input_text) ) pending_tasks = utils_gemini.generate_gemini_pro_text( prompt=prompt_tasks.format(input_text) ) next_best_action = utils_gemini.generate_gemini_pro_text( prompt=prompt_nbs.format(input_text) ) except GoogleAPICallError as e: raise HTTPException( status_code=400, detail=f"Error calling Vertex AI PaLM API. " f"{str(e)}", ) from e try: entities = utils_cloud_nlp.nlp_analyze_entities(input_text) except GoogleAPICallError as e: raise HTTPException( status_code=400, detail=f"Error calling Google Cloud NL API. " f"{str(e)}", ) from e return GenerateConversationsInsightsResponse( summary=summary, entities=entities, insights=insights, pending_tasks=pending_tasks, next_best_action=next_best_action, ) @router.post(path="/search-manuals") def search_manuals( data: SearchManualsRequest, ) -> SearchManualsResponse: """ # Search for conversations on Vertex AI Search Datastore ## Request Body [SearchConversationsRequest]: **query**: *string* - User input to search the datastore **user_pseudo_id**: *string* - User unique ID **category**: *list* - Filter field for manuals category - Allowed values - Bath Robe - Bath Towel Set - Bed - Bookcase - Chair - Console Table - Dining Table - Game Table - Grill - Office Chair - Ottoman - Outdoor Heater - Pool - Sofa - Tool Cabinet ## Response Body [SearchConversationsResponse]: **responses**: *dictionary* - Search results, including information about the conversation """ search_filter = "" if data.category: search_filter += 'category: ANY("' search_filter += '","'.join(data.category) search_filter += '") ' try: search_response = utils_search.vertexai_search_oneturn( search_query=data.query, summary_result_count=5, search_filter=search_filter, datastore_id=config["search-persona5"][ "product_manuals_datastore_id" ], ) except GoogleAPICallError as e: raise HTTPException( status_code=400, detail=f"Error searching Vertex AI datatore. " f"{str(e)}", ) from e responses = {} responses["summary"] = search_response.summary.summary_text responses["user_input"] = data.query responses["search_results"] = [] for result in search_response.results: search_result_dict = Message.to_dict(result) document = search_result_dict.get("document", {}) derived_struct_data = document.get("derived_struct_data", {}) if len(derived_struct_data.get("snippets", [])) > 0: struct_data = document.get("struct_data", {}) responses["search_results"].append( { "id": search_result_dict.get("id"), "snippet": derived_struct_data["snippets"][0]["snippet"], "link": derived_struct_data["link"], "title": struct_data["title"], "category": struct_data["category"], "manual": struct_data["manual"], } ) return SearchManualsResponse(responses=responses) @router.post(path="/schedule-event") def schedule_event(data: ScheduleEventRequest) -> ScheduleEventResponse: """ # Creates an event using Calendar API with Google Meet ## Request body for schedule-event **event_summary**: *string* - Event summary **attendees**: *list* - List of attendees **start_time**: *string* - Start time **end_time**: *string* - End time ## Response body for schedule-event **conference_call_link**: *string* - Conference call link **icon_url**: *string* - Icon URL **start_time_iso**: *string* - Start time ISO **end_time_iso**: *string* - End time ISO ## Raises **HTTPException** - *400* - Error """ try: event_summary = "Cymbal Support: Your Event Has Been Scheduled!" start_date = datetime.fromisoformat(data.start_time) end_date = ( datetime.fromisoformat(data.start_time) + timedelta(minutes=60) ).isoformat() result_dict = utils_workspace.create_calendar_event( event_summary=event_summary, attendees=data.attendees, start_date=start_date.isoformat(), end_date=end_date, ) except Exception as e: print(f"ERROR : Calendar Event Creation Failed : {e}") raise HTTPException(status_code=400, detail=str(e)) from e calendar_link = ( "https://calendar.google.com/" f"calendar/u/0/r/day/{start_date.year}/" f"{start_date.month}/{start_date.day}" ) email_response_html = f"""<html><body><p>Dear Customer,</p> <p>Thank you for contacting the support team.</p> <p>Your event has been scheduled, and one of our experts will be at your location on the date and time indicated below.</p> <p><b>Date: </b>{start_date.strftime('%m/%d/%Y, %H:%M:%S')} UTC</p> <p>Link to your calendar appointment: \ <a href="{calendar_link}">Calendar</a></p> <p>Best Regards,<br>You support team</p>""" for attendee in data.attendees: try: utils_workspace.send_email_single_thread( email_response_html=email_response_html, destination_email_address=attendee, email_subject=event_summary, ) except Exception as e: raise HTTPException( status_code=400, detail="Could not send the email. " + str(e) ) from e return ScheduleEventResponse( conference_call_link=result_dict["hangoutLink"], calendar_link=calendar_link, icon_url=result_dict["conferenceData"]["conferenceSolution"][ "iconUri" ], start_time_iso=result_dict["start"]["dateTime"], end_time_iso=result_dict["end"]["dateTime"], ) @router.put("/agent-activity/{user_id}/{activity_id}") def put_agent_activity( user_id: str, activity_id: str, activity: AgentActivity ) -> str: """ # Put agent activity ## path parameters **user_id**: *string* - user id **activity_id**: *string* - activity id ## agentactivity **title**: *string* - title of the activity **description**: *string* - description of the activity **customer_id**: *string* - customer id ## returns - ok ## raises **httpexception** - *400* - error setting in firestore - firestore could not set the activity """ try: firestore_client.collection("field-agent").document( user_id ).collection("activities").document(activity_id).set( { **activity.model_dump(), "timestamp": DatetimeWithNanoseconds.from_timestamp_pb( timestamp_pb2.Timestamp( seconds=activity.timestamp["seconds"], nanos=activity.timestamp["nanoseconds"], ) ), } ) except GoogleAPICallError as e: raise HTTPException( status_code=400, detail="Error setting in Firestore" + str(e) ) from e return "ok"