backend-apis/app/routers/p1_customer.py (584 lines of code) (raw):

# Copyrightll 2023 Google LLC # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # https://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. """ Persona 1 routers - Customer """ import base64 import json import tomllib from fastapi import APIRouter, HTTPException from fastapi.responses import HTMLResponse from google.api_core.exceptions import GoogleAPICallError from google.cloud import firestore, pubsub_v1 from google.cloud import translate_v2 as translate from proto import Message from app.models.p1_model import ( CollectRecommendationsEventsRequest, CompareProductsRequest, GetProductResponse, GetProductSummaryResponse, GetReviewsResponse, GetReviewsSummaryResponse, InitiateVertexAIRecommendationsRequest, InitiateVertexAIRecommendationsResponse, InitiateVertexAISearchRequest, InitiateVertexAISearchResponse, OrderRequest, SalesforceEmailSupportRequest, SalesforceEmailSupportResponse, SendEmailFromDocsRequest, TranslateTextRequest, TranslateTextResponse, TriggerRecommendationEmailRequest, TriggerUnstructuredRecommendationsRequest, TriggerUnstructuredSearchRequest, ) from app.utils import ( utils_cloud_sql, utils_gemini, utils_recommendations, utils_salesforce, utils_search, utils_workspace, ) # ----------------------------------------------------------------------------# # Load configuration file (config.toml) and global configs with open("app/config.toml", "rb") as f: config = tomllib.load(f) # ----------------------------------------------------------------------------# # ----------------------------------------------------------------------------# # Clients db = firestore.Client() publisher = pubsub_v1.PublisherClient() translate_client = translate.Client() # ----------------------------------------------------------------------------# # ----------------------------------------------------------------------------# # Pubsub Topics project_id = config["global"]["project_id"] website_topic_id = config["website_search"]["website_topic_id"] website_topic_path = publisher.topic_path(project_id, website_topic_id) recommendations_topic_id = config["recommendations"][ "recommendations_topic_id" ] email_recommendation_topic_id = config["recommendations"][ "email_recommendation_topic_id" ] email_template = config["recommendations"][ "email_template" ] recommendations_topic_path = publisher.topic_path( project_id, recommendations_topic_id ) email_recommendation_topic_path = publisher.topic_path( project_id, email_recommendation_topic_id ) # ----------------------------------------------------------------------------# router = APIRouter(prefix="/p1", tags=["P1 - Customer"]) # ----------------------------------GET---------------------------------------# @router.get( path="/get-product/{product_id}", response_model=GetProductResponse ) def get_product(product_id: int) -> dict: """ # Get Product ## Path parameters **product_id**: *string* - Product id ## Response body [GetProductResponse] **id**: *int* - Product id **title**: *string* - Product title **description** : *string* - Product description **image**: *string* - Product image **features**: *list* - Product features **categories**: *list* - Product categories **price**: *float* - Product price **quantity**: *int* - Product quantity ## Raises **HTTPException** - *400* - Cloud SQL Error - Error connecting to Cloud SQL **HTTPException** - *404* - Product not found - Product was not found in the Cloud SQL database """ try: product = utils_cloud_sql.get_product(product_id) except Exception as e: raise HTTPException(status_code=400, detail="Cloud SQL error.") from e if product: return utils_cloud_sql.convert_product_to_dict(product) raise HTTPException(status_code=404, detail="Product not found.") @router.get( path="/get-reviews/{product_id}", response_model=GetReviewsResponse ) def get_reviews(product_id: str) -> GetReviewsResponse: """ # Get Reviews ## Path parameters **product_id**: *string* - Product id to get the reviews ## Response body [GetReviewsResponse] **reviews**: *list[ReviewDoc]* - List of reviews for the product ## ReviewDoc **review**: *string* - Product review **sentiment**: *string* - Review sentiment **stars**: *int* - Review stars for the product ## Raises **HTTPException** - *400* - Error getting from Firestore - Firestore could not return the reviews """ try: reviews: list[GetReviewsResponse.ReviewDoc] = [ review_snapshot.to_dict() for review_snapshot in db.collection("website_reviews") .document(product_id) .collection("reviews") .get() ] except GoogleAPICallError as e: raise HTTPException( status_code=400, detail="Error getting from Firestore" + str(e) ) from e return GetReviewsResponse(reviews=reviews) @router.get( path="/get-reviews-summary/{product_id}", response_model=GetReviewsSummaryResponse, ) def get_reviews_summary(product_id: int) -> GetReviewsSummaryResponse: """ # Get reviews summary ## Path parameters **product_id**: *string* - Product id to get the reviews ## Response body [GetReviewsSummaryResponse] **reviews_summary**: *string* - Reviews summary for the product ## Raises **HTTPException** - *400* - Cloud SQL Error - Error connecting to Cloud SQL **HTTPException** - *404* - Product not found - Product was not found in the Cloud SQL database **HTTPException** - 400 - Error getting reviews - Firestore could not return the reviews **HTTPException** - 400 - Error generating reviews summary - PaLM could not generate the summary """ try: product = utils_cloud_sql.get_product(product_id) except Exception as e: raise HTTPException( status_code=400, detail="Cloud SQL error." + str(e) ) from e if not product: raise HTTPException(status_code=404, detail="Product not found.") product_dict = utils_cloud_sql.convert_product_to_dict(product) try: reviews: list[GetReviewsResponse.ReviewDoc] = [ review_snapshot.to_dict() for review_snapshot in db.collection("website_reviews") .document(str(product_id)) .collection("reviews") .get() ] except GoogleAPICallError as e: raise HTTPException( status_code=400, detail="Error getting reviews " + str(e) ) from e summary = "No reviews yet." if reviews: reviews_json = json.dumps( { "product": product_dict, "reviews": reviews, } ) try: summary = utils_gemini.generate_gemini_pro_text( prompt=config["summary"]["prompt_reviews"].format( reviews=reviews_json ), max_output_tokens=1024, temperature=0.2, top_k=40, top_p=0.8, ) except GoogleAPICallError as e: raise HTTPException( status_code=400, detail="Error generating reviews summary" + str(e), ) from e return GetReviewsSummaryResponse(reviews_summary=summary) @router.get( path="/get-product-summary/{product_id}", response_model=GetProductSummaryResponse, ) def get_product_summary(product_id: int) -> GetProductSummaryResponse: """ # Get Product Summary ## Path parameters **product_id**: *string* - Product id ## Response body for [GetProductSummaryResponse] **product_summary**: *string* - Product summary for the product ## Raises **HTTPException** - *400* - Cloud SQL Error - Error connecting to Cloud SQL **HTTPException** - *404* - Product not found - Product was not found in the Cloud SQL database **HTTPException** - *400* - Error generating the summary - PaLM could not generate the summary """ try: product = utils_cloud_sql.get_product(product_id) except Exception as e: raise HTTPException( status_code=400, detail="Cloud SQL error." + str(e) ) from e if not product: raise HTTPException(status_code=404, detail="Product not found.") product_dict = utils_cloud_sql.convert_product_to_dict(product) try: product_json = json.dumps(product_dict) summary = utils_gemini.generate_gemini_pro_text( prompt=config["summary"]["prompt_product"].format( product=product_json ), max_output_tokens=1024, temperature=0.2, top_k=40, top_p=0.8, ) return GetProductSummaryResponse(product_summary=summary) except Exception as e: raise HTTPException( status_code=400, detail="Error generating the summary " + str(e) ) from e # ---------------------------------POST---------------------------------------# @router.post(path="/collect-recommendations-events") def collect_recommendations_events(data: CollectRecommendationsEventsRequest): """ # Collect recommendations events ## Request body [CollectRecommendationsEventsRequest] **event_type**: *string* - Event type - Valid types: - search - view-item - view-item-list - view-home-page - view-category-page - add-to-cart - purchase - media-play - media-complete **user_pseudo_id**: *string* - User pseudo id **documents**: *list[string]* - List of documents ids **optional_user_event_fields**: *dict* - Optional user event fields ## Raises **HTTPException** - *400* - Error collecting event - Error collecting event to recommendations """ try: utils_recommendations.collect_events( data.event_type, data.user_pseudo_id, data.documents, data.optional_user_event_fields, ) except GoogleAPICallError as e: raise HTTPException( status_code=400, detail="Error collecting event. " + str(e), ) from e return "ok" @router.post(path="/compare-products", response_class=HTMLResponse) def compare_products(data: CompareProductsRequest) -> HTMLResponse: """ # Compare Products ## Request body [CompareProductsRequest] **products**: *list[string]* - List of products ids ## Returns: **HTMLResponse** - A HTML comparison table ## Raises: **HTTPException** - *400* - Error Generating the table - PaLM got an error generating the table """ try: for i in data.products: print("Product Description------") print(i) product_1 = json.loads(data.products[0]) del product_1["image"] del product_1["quantity"] del product_1["id"] del product_1["features"] del product_1["categories"] product_2 = json.loads(data.products[1]) del product_2["image"] del product_2["quantity"] del product_2["id"] del product_2["features"] del product_2["categories"] comparison = utils_gemini.generate_gemini_pro_text( prompt=config["compare"]["prompt_compare"].format( product_title_1=product_1["title"], product_description_1=product_1["description"], product_title_2=product_2["title"], product_description_2=product_2["description"] ), temperature=0.2, top_k=40, top_p=0.8 ) return HTMLResponse(content=comparison) except GoogleAPICallError as e: raise HTTPException( status_code=400, detail="Error generating the HTML. " + str(e) ) from e @router.post(path="/initiate-vertexai-recommendations") def initiate_vertexai_recommendations( data: InitiateVertexAIRecommendationsRequest, ) -> InitiateVertexAIRecommendationsResponse: """ # Initiate Vertex AI Recommendations ## Request body [InitiateVertexAIRecommendationsRequest] **recommendation_type**: *string* - Recommendation type - Valid types: - recommended-for-you - others-you-may-like - more-like-this - most-popular-items - new-and-featured **event_type**: *string* - Event type - Valid types: - search - view-item - view-item-list - view-home-page - view-category-page - add-to-cart - purchase - media-play - media-complete **user_pseudo_id**: *string* - User pseudo-id **documents**: *list[string]* - List of document ids **optional_user_event_fields**: *dict* - Optional user event fields ## Response body [InitiateVertexAIRecommendationsResponse] **recommendations_doc_id**: *string* - Firestore document id for recommendations ## Raises: **HTTPException** - *400* - Error creating firestore document **HTTPException** - *400* - Error publishing message to Pubsub """ try: recommendations_doc = db.collection( "website_recommendations" ).document() recommendations_doc.set( { "event_type": data.event_type, "user_pseudo_id": data.user_pseudo_id, "documents": data.documents, "recommendations_type": data.recommendation_type, "optional_user_event_fields": data.optional_user_event_fields, "recommendations": [], } ) except GoogleAPICallError as e: raise HTTPException( status_code=400, detail="Error creating Firestore document. " + str(e), ) from e try: payload = json.dumps( { "event_type": data.event_type, "user_pseudo_id": data.user_pseudo_id, "documents": data.documents, "recommendations_type": data.recommendation_type, "optional_user_event_fields": data.optional_user_event_fields, "recommendations_doc_id": recommendations_doc.id, } ).encode("utf-8") future = publisher.publish(recommendations_topic_path, payload) future.result() except GoogleAPICallError as e: raise HTTPException( status_code=400, detail="Error publishing the message to pubsub. " + str(e), ) from e return InitiateVertexAIRecommendationsResponse( recommendations_doc_id=recommendations_doc.id ) @router.post(path="/initiate-vertexai-search") def initiate_vertexai_search( data: InitiateVertexAISearchRequest, ) -> InitiateVertexAISearchResponse: """ # Initiate Vertex AI Search ## Request body [InitiateVertexAISearchRequest] **visitor_id**: *string* - Visitor id **query**: *string* - Search query **image**: *string* - Image url **search_doc_id**: *string* - Firestore document id for search. - If it is the first interaction, this is empty. **user_id**: *string* - User id ## Response body [InitiateVertexAISearchResponse] **search_doc_id**: *string* - Firestore document id for search ## Raises: **HTTPException** - *400* - Provide a text and/or an image to search **HTTPException** - *400* - Error creating firestore document **HTTPException** - *400* - Error publishing message to Pubsub """ if not data.image and not data.query: raise HTTPException( status_code=400, detail="Provide a text and/or an image to search." ) user_query = "" if data.query and data.image: user_query = ( f"Show items similar to this image and description. " f"Description: {data.query}" ) elif not data.query and data.image: user_query = "Show items similar to this image." elif data.query and not data.image: user_query = data.query try: search_doc = db.collection("website_search").document( data.search_doc_id or None ) if not data.search_doc_id: search_doc.set( { "conversation": [], "visitor_id": data.visitor_id, "user_id": data.user_id, } ) user_id = data.user_id or (search_doc.get().to_dict() or {}).get( "user_id", "" ) search_doc.update( { "conversation": firestore.ArrayUnion( [{"author": "user", "message": user_query}] ), "user_id": user_id, } ) except GoogleAPICallError as e: raise HTTPException( status_code=400, detail="Error creating Firestore document. " + str(e), ) from e try: payload = json.dumps( { "query": data.query, "visitor_id": data.visitor_id, "search_doc_id": search_doc.id, "image": data.image, } ).encode("utf-8") future = publisher.publish(website_topic_path, payload) future.result() except GoogleAPICallError as e: raise HTTPException( status_code=400, detail="Error publishing the message to pubsub. " + str(e), ) from e return InitiateVertexAISearchResponse(document_id=search_doc.id) @router.post(path="/salesforce-email-support") def salesforce_email_support( data: SalesforceEmailSupportRequest, ) -> SalesforceEmailSupportResponse: """ # Salesforce email support ## Request body [SalesforceEmailSupportRequest] **email_content**: *string* - Email content **email_html**: *string* - Email html **email_address**: *string* - Email address **user_name**: *string* - User name **subject**: *string* - Email subject **case_number**: *string* - Case number **salesforce_thread_id**: *string* - Salesforce thread id **is_human_talking**: *bool* - Whether a human should answer instead of the AI model ## Response body for [SalesforceEmailSupportResponse] **email_response**: *string* - Email response **is_human_talking**: *bool* - Whether a human should answer instead of the AI model **docs_id**: *string* - Google Docs Id for the human response ## Raises: **HTTPException** - *400* - Error """ # Retrieve or create Vertex AI Search conversation ( case_dict, email_thread, conversation, ) = utils_salesforce.get_resources(data) # Remove old messages email_content = data.email_content.split("wrote:")[0] # Retrieve questions from email questions = utils_salesforce.get_questions_from_email(email_content) # Remove questions with "agent" or "human" words in it filtered_questions = [] for q in questions: if not ("human" in q.lower() or "agent" in q.lower()): filtered_questions.append(q) questions = filtered_questions # Check if customer wants to talk to a human agent is_human_talking = data.is_human_talking if not is_human_talking: is_human_talking = utils_salesforce.is_human_needed(email_content) results_multimodal = [] results = [] # Check if there are attachments and get attachmentId attachments = utils_workspace.get_attachment_ids(email_thread) # If there are attachments if attachments: results_multimodal.append( utils_salesforce.salesforce_multimodal( attachments=attachments, internal_message_id=case_dict["internal_message_id"], conversation=conversation, questions=questions, ) ) else: results = utils_salesforce.results_from_questions( questions=questions, conversation_id=conversation.name ) email_response = utils_workspace.create_salesforce_email_body( results=results, results_multimodal=results_multimodal, user_name=data.user_name, is_human_talking=is_human_talking, ) # Translate email email_response = utils_salesforce.translate_email( email_content, email_response ) # Create and send the email case_dict["docs_id"] = utils_workspace.send_salesforce_email_with_reply( email_response=email_response, request=data, case_dict=case_dict, is_human_talking=is_human_talking, ) utils_salesforce.set_docs_id(data.case_number, case_dict["docs_id"]) return SalesforceEmailSupportResponse( email_response=email_response, is_human_talking=is_human_talking, docs_id=case_dict["docs_id"], ) @router.post(path="/send-email-from-docs") def send_email_from_docs(data: SendEmailFromDocsRequest) -> str: """ # Send email from Google Docs ## Request body [SendEmailFromDocsRequest] ** docs_id**: *string* - Google Docs Id to get the email from ## Raises: **HTTPException** - *404* - Case document not found ## Returns: - ok """ case_number, email_response = utils_workspace.get_email_from_docs( data.docs_id ) document = db.collection("salesforce_cases").document(case_number) document_snapshot = document.get() document_dict = document_snapshot.to_dict() if document_snapshot.exists and document_dict: utils_workspace.send_human_email( email_response=email_response, user_email_address=document_dict.get("user_email_address", ""), subject=document_dict.get("subject", ""), email_thread_id=document_dict.get("email_thread_id", ""), email_message_id=document_dict.get("email_message_id", ""), salesforce_thread_id=document_dict.get("salesforce_thread_id", ""), internal_thread_id=document_dict.get("internal_thread_id", ""), ) return "ok" raise HTTPException(status_code=404, detail="Case document not found.") @router.post(path="/translate-text") def translate_text(data: TranslateTextRequest) -> TranslateTextResponse: """ # Translate text ## Request body [TranslateTextRequest] **text**: *list[string]* - List of strings representing the text to translate **target_language**: *string* - Target language **source_language**: *string* - Source language ## Response body [TranslateTextResponse] **translation**: *list[string]* - List of strings representing the translated text ## Raises: **HTTPException** - *400* - Error """ try: results = [] i = 0 while i * 128 < len(data.text): result = translate_client.translate( data.text[i * 128 : i * 128 + 128], target_language=data.target_language, source_language=data.source_language, ) results.extend(result) i += 1 except Exception as e: raise HTTPException( status_code=400, detail="Something went wrong. Please try again. " + str(e), ) from e return TranslateTextResponse(translation=results) @router.post(path="/trigger-unstructured-recommendations") def trigger_unstructured_recommendations( data: TriggerUnstructuredRecommendationsRequest, ) -> str: """ # Trigger Unstructured Recommendations (PubSub) ## Request body [TriggerUnstructuredRecommendationsRequest] **message**: *dict* - Pubsub message representing the recommendations request ## Raises: **HTTPException** - *400* - Could not load pubsub message **HTTPException** - *400* - Error writing document to Firestore ## Returns: - ok """ try: message = base64.b64decode(data.message["data"]).decode("utf-8") message_dict = json.loads(message) except Exception as e: raise HTTPException( status_code=400, detail="Could not load pubsub message. " + str(e) ) from e if ( message_dict["event_type"] == "view-item" and not message_dict["documents"] ): return "ok" results = [] if message_dict["recommendations_type"] == "new-and-featured": results = utils_cloud_sql.get_random_products(size=10) else: try: documents = utils_recommendations.get_recommendations( recommendations_type=message_dict["recommendations_type"], event_type=message_dict["event_type"], user_pseudo_id=message_dict["user_pseudo_id"], documents=message_dict["documents"], optional_user_event_fields=message_dict[ "optional_user_event_fields" ], ) documents_dict = Message.to_dict(documents).get("results") if not documents_dict: raise FileNotFoundError("No documents returned") except (FileNotFoundError, GoogleAPICallError) as e: print(e) results = utils_cloud_sql.get_random_products(size=10) else: results = utils_cloud_sql.get_products( [doc["id"] for doc in documents_dict] ) try: db.collection("website_recommendations").document( message_dict["recommendations_doc_id"] ).set({"recommendations": results}, merge=True) except Exception as e: raise HTTPException( status_code=400, detail="Error writing document to Firestore. " + str(e), ) from e return "ok" @router.post(path="/trigger-unstructured-search") def trigger_unstructured_search( data: TriggerUnstructuredSearchRequest, ) -> str: """ ### Request body [TriggerUnstructuredSearchRequest] **message**: *dict* - Pubsub message representing the search request ## Raises: **HTTPException** - *400* - Could not load pubsub message **HTTPException** - *400* - Error writing document to Firestore ## Returns: - ok """ try: message = base64.b64decode(data.message["data"]).decode("utf-8") message_dict = json.loads(message) except Exception as e: raise HTTPException( status_code=400, detail="Could not load pubsub message. " + str(e) ) from e search_doc = db.collection("website_search").document( message_dict["search_doc_id"] ) search_doc_dict = search_doc.get().to_dict() or {} conversation_id = search_doc_dict.get("conversation_id", "") if not conversation_id: conversation_id = utils_search.create_new_conversation( datastore_id=config["website_search"]["website_datastore_id"], user_pseudo_id=message_dict["visitor_id"], ).name search_doc.update({"conversation_id": conversation_id}) search_doc_dict = search_doc.get().to_dict() or {} conversation_history = utils_search.generate_conversation_history( message_dict, search_doc_dict ) try: firestore_payload = { "conversation": conversation_history, } search_doc.set(firestore_payload, merge=True) except Exception as e: raise HTTPException( status_code=400, detail="Error writing document to Firestore. " + str(e), ) from e return "ok" # ---------------------------------POST---------------------------------------# @router.post(path="/add-order") def add_order(order: OrderRequest) -> str: """ # Add user order **order_date**: *string* - Date of Order **order_status** : *string* - Order Status **order_items**: *list* - List of items ordered **user_id**: *string* - User who ordered **email**: *string* - user email **total_amount**: *float* - Amount of order **is_delivery**: *boolean* - Home delivery **is_pickup**: *boolean* - Pickup from the store **pickup_datetime**: *string* - scheduled pickup time ## Returns - orderid ## Raises **HTTPException** - *400* - Error setting in Firestore - Firestore could not set the order """ try: doc = db.collection("orders").document() doc.set(order.model_dump()) except GoogleAPICallError as e: raise HTTPException( status_code=400, detail="Error setting in Firestore" + str(e) ) from e try: payload = json.dumps( { "email": order.email, "user_id": order.user_id, "documents": order.order_items } ).encode("utf-8") future = publisher.publish(email_recommendation_topic_path, payload) future.result() except GoogleAPICallError as e: raise HTTPException( status_code=400, detail="Error publishing the message to pubsub. " + str(e), ) from e return str(doc.id) @router.post(path="/trigger-recommendation-email") def trigger_recommendation_email( data: TriggerRecommendationEmailRequest, ) -> str: """ # Trigger Recommendation Email (PubSub) ## Request body [TriggerRecommendationEmailRequest] **message**: *dict* - Pubsub message representing the recommendations request ## Raises: **HTTPException** - *400* - Could not load pubsub message **HTTPException** - *400* - Error writing document to Firestore ## Returns: - ok """ try: message = base64.b64decode(data.message["data"]).decode("utf-8") message_dict = json.loads(message) except Exception as e: raise HTTPException( status_code=400, detail="Could not load pubsub message. " + str(e) ) from e results = [] try: documents = utils_recommendations.get_recommendations( recommendations_type="others-you-may-like", event_type='add-to-cart', user_pseudo_id=message_dict["user_id"], documents=[str(message_dict["documents"][0]['id'])] ) documents_dict = Message.to_dict(documents).get("results") if not documents_dict: raise FileNotFoundError("No documents returned") except (FileNotFoundError, GoogleAPICallError) as e: print(e) results = utils_cloud_sql.get_random_products(size=3) else: results = utils_cloud_sql.get_products( [doc["id"] for doc in documents_dict] ) try: recommendation_email = email_template i=1 for record in results: recommendation_email = recommendation_email.replace( "{PRODUCT_TITLE_"+str(i)+"}",record['title']) recommendation_email = recommendation_email.replace( "{PRODUCT_CATEGORY_"+str(i)+"}",record['categories'][0]) recommendation_email = recommendation_email.replace( "{PRODUCT_DESC_"+str(i)+"}",record['description']) recommendation_email = recommendation_email.replace( "{PRODUCT_IMG_"+str(i)+"}",record['image']) i+=1 utils_workspace.send_email_single_thread( recommendation_email,message_dict['email'], "Thank you for ordering from Cymbal!") except Exception as e: raise HTTPException( status_code=400, detail="Error sending email, " + str(e), ) from e return "ok"