backend-apis/app/utils/utils_workspace.py (574 lines of code) (raw):

# Copyright 2023 Google LLC # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # https://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. """ Utils for Workspace """ import base64 import json import tomllib from email.mime.text import MIMEText from fastapi import HTTPException from google.cloud import secretmanager from google.oauth2 import service_account from google.oauth2.credentials import Credentials from googleapiclient.discovery import build from app.models.p1_model import SalesforceEmailSupportRequest from app.utils.utils_ws_protocols import ( DocsProtocol, DocumentResource, DriveProtocol, GmailProtocol, ScriptProtocol, ) with open("app/config.toml", "rb") as f: config = tomllib.load(f) calendar_id = config["workspace"]["calendar_id"] project_id = config["global"]["project_id"] calendar_secret_id = config["workspace"]["calendar_secret_id"] secret_client = secretmanager.SecretManagerServiceClient() secret_user_info = secret_client.access_secret_version( request={"name": calendar_secret_id} ) workspace_user_info = json.loads(secret_user_info.payload.data.decode("UTF-8")) calendar_credentials = Credentials.from_authorized_user_info( info=workspace_user_info, scopes=config["workspace"]["calendar_scopes"] ) calendar_service = build( serviceName="calendar", version="v3", credentials=calendar_credentials ) class WorkspaceServices: """Workspace Services""" _user_credentials: Credentials | None = None _sa_credentials: service_account.Credentials | None = None _secret_client: secretmanager.SecretManagerServiceClient | None = None _gmail_service: GmailProtocol | None = None _drive_service: DriveProtocol | None = None _script_service: ScriptProtocol | None = None _docs_service: DocsProtocol | None = None def _get_secret_client(self) -> secretmanager.SecretManagerServiceClient: if not self._secret_client: self._secret_client = secretmanager.SecretManagerServiceClient() return self._secret_client def _get_user_credentials(self) -> Credentials: if not self._user_credentials: secret_user_info = self._get_secret_client().access_secret_version( request={"name": config["salesforce"]["user_secret_name"]} ) workspace_user_info = json.loads( secret_user_info.payload.data.decode("UTF-8") ) self._user_credentials = Credentials.from_authorized_user_info( info=workspace_user_info, scopes=config["salesforce"]["email_scopes"], ) return self._user_credentials def _get_sa_credentials(self) -> service_account.Credentials: if not self._sa_credentials: sa_secret_info = self._get_secret_client().access_secret_version( request={"name": config["salesforce"]["sa_secret_name"]} ) self._sa_credentials = ( service_account.Credentials.from_service_account_info( info=json.loads( sa_secret_info.payload.data.decode("UTF-8") ), scopes=config["salesforce"]["workspace_scopes"], ) ) return self._sa_credentials def gmail(self) -> GmailProtocol: """Gmail Service Returns: Resource Gmail Service """ if not self._gmail_service: self._gmail_service = build( "gmail", "v1", credentials=self._get_user_credentials() ) return self._gmail_service def script(self) -> ScriptProtocol: """Script Service Returns: Resource Script Service """ if not self._script_service: self._script_service = build( "script", "v1", credentials=self._get_user_credentials() ) return self._script_service def docs(self) -> DocsProtocol: """Docs Service Returns: Resource Docs Service """ if not self._docs_service: self._docs_service = build( "docs", "v1", credentials=self._get_sa_credentials() ) return self._docs_service def drive(self) -> DriveProtocol: """Drive Service Returns: Resource Drive Service """ if not self._drive_service: self._drive_service = build( "drive", "v3", credentials=self._get_sa_credentials() ) return self._drive_service ws = WorkspaceServices() def get_last_message_id(email_thread: dict) -> str: """ Args: email_thread: Raises: HTTPException: Returns: """ message_id = "" try: # Get message ID from the last message for header in email_thread["messages"][-1]["payload"]["headers"]: if header["name"].lower() == "message-id": message_id = header["value"] except Exception as e: raise HTTPException(status_code=400, detail=str(e)) from e return message_id def get_gmail_thread(user_id: str, internal_thread_id: str) -> dict: """ Args: user_id: internal_thread_id: Raises: HTTPException: Returns: """ try: thread = ( ws.gmail() .users() .threads() .get(userId=user_id, id=internal_thread_id) .execute(num_retries=20) ) except Exception as e: raise HTTPException(status_code=400, detail=str(e)) from e return thread def list_gmail_threads( user_id: str, email_address: str, subject: str, max_results: int = 1 ) -> dict: """ Args: user_id: email_address: subject: max_results: Raises: HTTPException: Returns: """ try: query = f'from:{email_address} label:inbox subject:"{subject}"' list_threads = ( ws.gmail() .users() .threads() .list(userId=user_id, maxResults=max_results, q=query) .execute(num_retries=20) ) except Exception as e: raise HTTPException(status_code=400, detail=str(e)) from e return list_threads def get_email_thread_id(email_thread: dict) -> tuple: """ Args: email_thread: Raises: HTTPException: Returns: """ email_thread_id = "" email_message_id = "" try: # Get thread ID from the first message for header in email_thread["messages"][0]["payload"]["headers"]: if header["name"].lower() == "message-id": email_thread_id = header["value"] # Get message ID from the last message for header in email_thread["messages"][-1]["payload"]["headers"]: if header["name"].lower() == "message-id": email_message_id = header["value"] except Exception as e: raise HTTPException(status_code=400, detail=str(e)) from e return email_thread_id, email_message_id def get_attachment_ids(email_thread: dict) -> list: """ Args: email_thread: Raises: HTTPException: Returns: """ attachments = set() try: email_parts = email_thread["messages"][-1]["payload"].get("parts", []) for part_level_0 in email_parts: if ( part_level_0["mimeType"] == "image/png" or part_level_0["mimeType"] == "image/jpg" ): attachments.add(part_level_0["body"]["attachmentId"]) elif "multipart" in part_level_0["mimeType"]: for part_level_1 in part_level_0["parts"]: if ( part_level_1["mimeType"] == "image/png" or part_level_1["mimeType"] == "image/jpg" ): attachments.add(part_level_1["body"]["attachmentId"]) except Exception as e: raise HTTPException(status_code=400, detail=str(e)) from e return list(attachments) def get_attachment( attachment_id: str, internal_message_id: str, user_id: str ) -> dict: """ Args: attachment_id: internal_message_id: user_id: Raises: HTTPException: Returns: """ try: attachment = ( ws.gmail() .users() .messages() .attachments() .get( userId=user_id, messageId=internal_message_id, id=attachment_id ) .execute(num_retries=20) ) except Exception as e: raise HTTPException(status_code=400, detail=str(e)) from e return attachment def create_salesforce_email_body( results: list, results_multimodal: list, user_name: str, is_human_talking: bool, ) -> str: """ Args: results: user_name: is_human_talking: Returns: """ email_response = ( f"<html><body><p>Dear Customer {user_name},</p>" "<p>Thank you for contacting the support team.</p>" ) if not results and not results_multimodal: if not is_human_talking: email_response += "<p>How can we help you today?</p>" else: email_response += "<p>Soon one of our specialists will get in touch with you.</p>" elif results_multimodal: email_response += ( "<p>We found some questions about the attached image (multimodal search). " "We hope the following answers can be of help to you:</p>" ) for i in results_multimodal: joined_links = [ (f'<li><a href="{link}">Reference</a></li>') for link in i["links"] ] joined_links = "\n".join(joined_links) summary = ( i["summary_text"] or "Here are some links you might find useful." ) email_response += ( f"<p><b>Question: </b>{i['question']}</p>" f"<p><b>Answer: </b>{summary}</p>" f"<ol>{joined_links}</ol>" ) elif results: email_response += ( "<p>We identified some questions in your email. " "We hope the following answers can be of help to you:</p>" ) for i in results: joined_links = [ (f'<li><a href="{link}">Reference</a></li>') for link in i["links"] ] joined_links = "\n".join(joined_links) summary = ( i["summary_text"] or "Here are some links you might find useful." ) email_response += ( f"<p><b>Question: </b>{i['question']}</p>" f"<p><b>Answer: </b>{summary}</p>" f"<ol>{joined_links}</ol>" ) if (results or results_multimodal) and is_human_talking: email_response += ( "<p>We also know that you would like to talk to a human agent. " "Soon one of our specialists will get in touch with you.</p>" ) email_response += "<p>This response was generated by an AI assistant. " \ 'Please respond with "I want to speak with a human agent" to get connected to a human</p>' \ "<p>Best Regards,<br>You support team</p>" return email_response def send_human_email( email_response: str, user_email_address: str, subject: str, email_thread_id: str, email_message_id: str, salesforce_thread_id: str, internal_thread_id: str, ) -> None: """ Args: email_response: user_email_address: subject: email_thread_id: email_message_id: salesforce_thread_id: internal_thread_id: Raises: HTTPException: """ try: message = MIMEText(email_response, "html") message["To"] = user_email_address message["From"] = config["salesforce"]["user_id"] message["Subject"] = subject message["In-Reply-To"] = email_message_id message["References"] = email_thread_id + " " + salesforce_thread_id # encoded message encoded_message = base64.urlsafe_b64encode(message.as_bytes()).decode() create_message = { "raw": encoded_message, "threadId": internal_thread_id, } ws.gmail().users().messages().send( userId=config["salesforce"]["user_id"], body=create_message ).execute(num_retries=20) except Exception as e: raise HTTPException(status_code=400, detail=str(e)) from e def send_email_single_thread( email_response_html: str, destination_email_address: str, email_subject: str, ) -> str: """ Args: email_response: request: case_dict: is_human_talking: Raises: HTTPException: Returns: str Google Docs Id if created or empty if not """ message = MIMEText(email_response_html, "html") message["To"] = destination_email_address message["From"] = config["salesforce"]["user_id"] message["Subject"] = email_subject encoded_message = base64.urlsafe_b64encode(message.as_bytes()).decode() ws.gmail().users().messages().send( userId=config["salesforce"]["user_id"], body={"raw": encoded_message} ).execute(num_retries=20) return "ok" def send_salesforce_email_with_reply( email_response: str, request: SalesforceEmailSupportRequest, case_dict: dict, is_human_talking: bool, ) -> str: """ Args: email_response: request: case_dict: is_human_talking: Raises: HTTPException: Returns: str Google Docs Id if created or empty if not """ docs_id = "" try: message = MIMEText(email_response, "html") message["To"] = request.email_address message["From"] = config["salesforce"]["user_id"] message["Subject"] = request.subject message["In-Reply-To"] = case_dict["email_message_id"] message["References"] = ( case_dict["email_thread_id"] + " " + request.salesforce_thread_id ) # encoded message encoded_message = base64.urlsafe_b64encode(message.as_bytes()).decode() create_message = { "raw": encoded_message, "threadId": case_dict["internal_thread_id"], } ws.gmail().users().messages().send( userId=config["salesforce"]["user_id"], body=create_message ).execute(num_retries=20) if is_human_talking: # Create email draft ws.gmail().users().drafts().create( userId=config["salesforce"]["user_id"], body={ "message": { "raw": encoded_message, "threadId": case_dict["internal_thread_id"], } }, ).execute(num_retries=20) if case_dict["docs_id"]: reset_docs_content( case_docs_id=case_dict["docs_id"], template_docs_id=config["salesforce"]["docs_template_id"], ) docs_id = case_dict["docs_id"] else: # Create document to help edit the email docs_id = copy_drive_file( drive_file_id=config["salesforce"]["docs_template_id"], parent_folder_id=config["salesforce"]["drive_folder_id"], copy_title=request.case_number, ) add_apps_script_to_case_docs( script_name=f"SendEmail{request.case_number}", file_id=docs_id, ) update_doc( document_id=docs_id, email_content=request.email_content, subject=request.subject, user_name=request.user_name, ) except Exception as e: raise HTTPException(status_code=400, detail=str(e)) from e return docs_id def set_docs_permission(file_id: str): """ Args: file_id: Returns: """ permission = {"type": "domain", "domain": "google.com", "role": "writer"} return ( ws.drive() .permissions() .create( fileId=file_id, sendNotificationEmail=False, body=permission, supportsAllDrives=True, ) .execute(num_retries=20) ) def add_apps_script_to_case_docs(script_name: str, file_id: str): """ Args: script_name: file_id: """ script: dict = ( ws.script() .projects() .create(body={"title": script_name, "parentId": file_id}) .execute(num_retries=20) ) request = { "files": [ { "name": "Code", "type": "SERVER_JS", "source": config["salesforce"]["apps_script_code"], "functionSet": { "values": [{"name": "sendId"}, {"name": "onOpen"}] }, }, { "name": "appsscript", "type": "JSON", "source": '{"timeZone": "America/New_York", "exceptionLogging": "CLOUD"}', }, ] } ws.script().projects().updateContent( scriptId=script["scriptId"], body=request ).execute(num_retries=20) def copy_drive_file( drive_file_id: str, parent_folder_id: str, copy_title: str ): """ Args: drive_file_id: parentFolderId: copy_title: Returns: """ body = {"name": copy_title, "parents": [parent_folder_id]} drive_response = ( ws.drive() .files() .copy(fileId=drive_file_id, body=body, supportsAllDrives=True) .execute(num_retries=20) ) docs_copy_id = drive_response.get("id") return docs_copy_id def reset_docs_content(template_docs_id: str, case_docs_id: str): """ Args: template_docs_id: case_docs_id: Raises: HTTPException: """ document = ( ws.docs() .documents() .get(documentId=template_docs_id) .execute(num_retries=20) ) docs_template_content = document["body"]["content"] # get content of the current doc and delete its content current_document = ( ws.docs() .documents() .get(documentId=case_docs_id) .execute(num_retries=20) ) current_document_content = current_document["body"]["content"] try: # Delete everything requests = [] requests = [ { "deleteContentRange": { "range": { "startIndex": 1, "endIndex": current_document_content[-1]["endIndex"] - 1, } } } ] ws.docs().documents().batchUpdate( documentId=case_docs_id, body={"requests": requests} ).execute(num_retries=20) # Update document with template requests = [] for i in docs_template_content[1:]: for j in i["paragraph"]["elements"]: requests.append( { "insertText": { "text": j["textRun"]["content"], "location": {"index": j["startIndex"]}, } } ) if j["textRun"]["textStyle"]: requests.append( { "updateTextStyle": { "range": { "startIndex": j["startIndex"], "endIndex": j["endIndex"], }, "textStyle": {**j["textRun"]["textStyle"]}, "fields": ", ".join( j["textRun"]["textStyle"].keys() ), } } ) if i["paragraph"].get("paragraphStyle", ""): requests.append( { "updateParagraphStyle": { "range": { "startIndex": i["startIndex"], "endIndex": i["endIndex"], }, "paragraphStyle": { **i["paragraph"]["paragraphStyle"] }, "fields": ", ".join( [ k for k in i["paragraph"][ "paragraphStyle" ].keys() if k != "headingId" ] ), } } ) ws.docs().documents().batchUpdate( documentId=case_docs_id, body={"requests": requests} ).execute(num_retries=20) except Exception as e: raise HTTPException(status_code=400, detail=str(e)) from e def update_doc( document_id: str, email_content: str, subject: str, user_name: str ): """ Args: document_id: email_content: subject: user_name: """ requests = [ { "replaceAllText": { "containsText": { "text": "{{email-content}}", "matchCase": "true", }, "replaceText": email_content, } }, { "replaceAllText": { "containsText": {"text": "{{subject}}", "matchCase": "true"}, "replaceText": subject, } }, { "replaceAllText": { "containsText": {"text": "{{user-name}}", "matchCase": "true"}, "replaceText": user_name, } }, ] ws.docs().documents().batchUpdate( documentId=document_id, body={"requests": requests} ).execute(num_retries=20) def get_email_from_docs(docs_id: str) -> tuple: """ Args: docs_id: Returns: """ document: DocumentResource = ( ws.docs().documents().get(documentId=docs_id).execute(num_retries=20) ) document_content = document["body"]["content"] case_number = document["title"] found = False jump = False complete = False email = "<body>" for structural_element in document_content: if complete: break if jump: jump = False continue if found and "paragraph" in structural_element: email += "<p>" for element in structural_element.get("paragraph", {"elements": []})[ "elements" ]: if "message_body" in element["textRun"]["content"]: if not found: found = True jump = True continue complete = True break if found: email += element["textRun"]["content"] if found and "paragraph" in structural_element: email += "</p>" email = email.replace("\n", "<br>") email += "</body>" return case_number, email def create_calendar_event( event_summary: str, attendees: list[str], start_date: str, end_date: str ) -> dict: event_data = { "summary": event_summary, "start": { "dateTime": start_date, }, "end": { "dateTime": end_date, }, "attendees": [{"email": attendee} for attendee in attendees], "conferenceData": { "createRequest": { "requestId": "support", "conferenceSolutionKey": {"type": "hangoutsMeet"}, } }, } event = ( calendar_service.events() .insert( calendarId=calendar_id, body=event_data, conferenceDataVersion=1 ) .execute(num_retries=20) ) return event