backend-apis/app/utils/utils_salesforce.py (269 lines of code) (raw):
# Copyright 2023 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Utils for Salesforce
"""
import base64
import binascii
import json
import tomllib
import numpy as np
from fastapi.exceptions import HTTPException
from google.api_core.exceptions import GoogleAPICallError
from google.cloud import firestore, translate_v2
from google.cloud.discoveryengine_v1beta.types.conversation import Conversation
from proto import Message
from app.models.p1_model import SalesforceEmailSupportRequest
from app.utils import (
utils_cloud_sql,
utils_palm,
utils_search,
utils_vertex_vector,
utils_workspace,
)
with open("app/config.toml", "rb") as f:
config = tomllib.load(f)
project_id = config["global"]["project_id"]
location = config["global"]["location"]
multimodal_model = config["multimodal"]["multimodal_model"]
index_endpoint_id = config["multimodal"]["index_endpoint_id"]
deployed_index_id = config["multimodal"]["deployed_index_id"]
vector_api_endpoint = config["multimodal"]["vector_api_endpoint"]
prompt_questions = config["salesforce"]["prompt_email_support"]
datastore_location = config["global"]["datastore_location"]
email_datastore_id = config["website_search"]["website_datastore_id"]
translate_client = translate_v2.Client()
embeddings_client = utils_palm.EmbeddingPredictionClient(project=project_id)
db = firestore.Client()
def results_from_questions(questions: list, conversation_id: str) -> list:
"""Search for results from questions.
Args:
questions:
Questions to be searched
Returns:
list
List of dict representations of results
"""
results = []
for question in questions:
if (
"image" in question.lower()
or "picture" in question.lower()
or "attach" in question.lower()
):
continue
try:
search_results = utils_search.vertexai_search_multiturn(
search_query=question,
conversation_id=conversation_id,
summary_result_count=2,
datastore_id=config["website_search"]["website_datastore_id"],
)
search_results_dict = Message.to_dict(search_results)
except GoogleAPICallError as e:
print(e)
continue
result = {}
result["question"] = question
result["summary_text"] = search_results_dict.get("reply", {}).get(
"reply", ""
)
result["links"] = []
for i, r in enumerate(search_results_dict.get("search_results", [])):
if i == 2:
break
result["links"].append(
config["salesforce"]["website_uri"] + r["id"]
)
results.append(result)
return results
def set_docs_id(case_number: str, docs_id: str):
"""Set the Case Google Docs id in Firestore
Args:
case_number: str
The case number. Used as the document id in Firestore.
docs_id:
The case Google Docs id.
"""
document = db.collection("salesforce_cases").document(case_number)
document.set({"docs_id": docs_id}, merge=True)
def translate_email(email_content: str, email_response: str) -> str:
"""Translate email response to the email content language
using Google Translation API.
Args:
email_content: str
Email content
email_response: str
Email response
Returns:
str
Translated email response to the email content language
"""
# Translate email
target_language = translate_client.detect_language(email_content)[
"language"
]
translated_email = email_response
if target_language != "en":
results = []
email_response_split = email_response.split()
i = 0
while i * 128 < len(email_response_split):
result = translate_client.translate(
" ".join(email_response_split[i * 128 : i * 128 + 128]),
target_language=target_language,
source_language="en",
)
results.append(result)
i += 1
for r in results:
translated_email += r["translatedText"].replace("> ", ">")
email_response = translated_email
return translated_email
def get_resources(
data: SalesforceEmailSupportRequest,
) -> tuple[dict, dict, Conversation]:
"""Get salesforce resources in Firestore.
Try to create resources if they don't exist.
Args:
data: SalesforceEmailSupporRequest
The email support request
Raises:
HTTPException:
If it is not possible to get the thread id
Returns:
tuple
case_dict, email_content and Conversation
"""
case_document = db.collection("salesforce_cases").document(
data.case_number
)
case_document_snapshot = case_document.get()
case_dict = case_document_snapshot.to_dict() or {
"user_email_address": data.email_address,
"subject": data.subject,
"user_name": data.user_name,
"salesforce_thread_id": data.salesforce_thread_id,
}
if case_document_snapshot.exists and case_dict:
email_thread = utils_workspace.get_gmail_thread(
user_id=config["salesforce"]["user_id"],
internal_thread_id=case_dict["internal_thread_id"],
)
case_dict["email_message_id"] = utils_workspace.get_last_message_id(
email_thread
)
case_dict["internal_message_id"] = email_thread["messages"][0]["id"]
conversation = utils_search.get_search_conversation(
conversation_resource=case_dict["conversation_resource"]
)
else:
case_dict["docs_id"] = ""
# Create a new conversation
conversation = utils_search.create_new_conversation(
datastore_id=email_datastore_id,
user_pseudo_id=data.email_address,
)
case_dict["conversation_resource"] = conversation.name
# Retrieve thread_id from Gmail
emails = utils_workspace.list_gmail_threads(
user_id=config["salesforce"]["user_id"],
email_address=data.email_address,
subject=data.subject,
)
try:
case_dict["internal_thread_id"] = emails["threads"][0]["id"]
except Exception as e:
raise HTTPException(
status_code=500, detail=f"{str(e)}. Could not get thread id."
) from e
email_thread = utils_workspace.get_gmail_thread(
user_id=config["salesforce"]["user_id"],
internal_thread_id=case_dict["internal_thread_id"],
)
case_dict["internal_message_id"] = email_thread["messages"][0]["id"]
(
case_dict["email_thread_id"],
case_dict["email_message_id"],
) = utils_workspace.get_email_thread_id(email_thread)
case_document.set(case_dict)
return case_dict, email_thread, conversation
def salesforce_multimodal(
attachments: list,
internal_message_id: str,
conversation: Conversation,
questions: list,
) -> dict:
"""Try to answer multimodal questions from the email.
Args:
attachments: list
Attachments from the email
internal_message_id: str
Internal message id
email_content: str
Email content
conversation: Conversation
Current conversation resource
Returns:
dict
A dict representing the result
"""
# Extract embedding for each image
images_embeddings = [] # Embeddings of all images
for attachment in attachments:
try:
email_attachment = utils_workspace.get_attachment(
attachment_id=attachment,
internal_message_id=internal_message_id,
user_id=config["salesforce"]["user_id"],
)
# Replace the characters in the base64 string
image_bytes = (
email_attachment["data"]
.replace("_", "/")
.replace("-", "+")
.replace(" ", "")
)
image_bytes = base64.b64decode(image_bytes)
except binascii.Error as e:
print(e)
continue
else:
images_embeddings.append(
embeddings_client.get_embedding(
image_bytes=image_bytes
).image_embedding
)
if images_embeddings:
image_embedding = list(np.sum(np.array(images_embeddings), axis=0))
return embeddings_search(
image_embedding=image_embedding,
conversation=conversation,
questions=questions,
)
return {}
def get_questions_from_email(email_content: str) -> list:
"""Get questions from email using GenAI.
Args:
email_content: str
Email content
multimodal: bool
Whether to use multimodal prompt or not.
Returns:
dict
A dict of questions found by GenAI
"""
questions_dict = {}
try:
questions = utils_palm.text_generation(
prompt=prompt_questions.format(email_content)
)
questions = questions.replace("```json", "")
questions = questions.replace("```", "")
questions_dict = json.loads(questions)
except (json.JSONDecodeError, GoogleAPICallError) as e:
print(e)
questions = questions_dict.get("questions_inquiries_orders", [])
return questions
def is_human_needed(email_content: str) -> bool:
"""Check if customer wants to talk to a human agent.
Args:
email_content: str
Email content
Returns:
bool
If human is needed
"""
is_human_talking_palm = utils_palm.text_generation(
prompt=config["salesforce"]["prompt_talk_to_human"].format(
email_content
),
)
if "true" not in is_human_talking_palm.lower():
return False
return True
def embeddings_search(
image_embedding: list[float], conversation: Conversation, questions: list
) -> dict:
"""Search multimodal questions from email
Args:
image_embedding: list[float]
Attached image embedding
email_content: str
Email content
conversation: Conversation
Current conversation resource
Returns:
dict
A dict representing the result
"""
# This means the parsing succeeded and there are questions
multimodal_questions = []
for question in questions:
if (
"image" in question.lower()
or "picture" in question.lower()
or "attach" in question.lower()
):
multimodal_questions.append(question)
if multimodal_questions:
# Extract embedding for each text
text_embeddings = []
for question in multimodal_questions:
text_embeddings.append(
embeddings_client.get_embedding(text=question).text_embedding
)
# Combine embeddings: Texts + Images
multimodal_embedding = utils_palm.reduce_embedding_dimension(
vector_text=list(np.sum(np.array(text_embeddings), axis=0)),
vector_image=image_embedding,
)
else:
# Combine embeddings: Images
multimodal_embedding = utils_palm.reduce_embedding_dimension(
vector_image=image_embedding
)
neighbors = utils_vertex_vector.find_neighbor(
feature_vector=multimodal_embedding,
neighbor_count=2,
)
append_to_conversation = {}
if multimodal_questions:
append_to_conversation["input"] = " \n".join(multimodal_questions)
else:
append_to_conversation[
"input"
] = "Show products similar to the attached image."
neighbors_result = {}
neighbors_result["question"] = append_to_conversation["input"]
neighbors_result["links"] = []
neighbors_result["snapshots"] = []
for n in neighbors.nearest_neighbors[0].neighbors:
neighbors_result["links"].append(
config["salesforce"]["website_uri"] + n.datapoint.datapoint_id
)
product = utils_cloud_sql.get_product(int(n.datapoint.datapoint_id))
if product:
snapshot = utils_cloud_sql.convert_product_to_dict(product)
neighbors_result["snapshots"].append(snapshot)
references = json.dumps(
{
f"{i+1}": snapshot
for i, snapshot in enumerate(neighbors_result["snapshots"])
}
)
append_to_conversation["reply"] = utils_palm.text_generation(
prompt=config["salesforce"]["prompt_generate_reply"].format(
references=references
),
)
neighbors_result["summary_text"] = append_to_conversation["reply"]
# Append to current Conversation
utils_search.update_search_conversation(
conversation=conversation,
append_to_conversation=append_to_conversation,
)
return neighbors_result