ees_microsoft_teams/microsoft_teams_user_messages.py (269 lines of code) (raw):

# # Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one # or more contributor license agreements. Licensed under the Elastic License 2.0; # you may not use this file except in compliance with the Elastic License 2.0. # """ This module fetches all the messages, attachments, chat tabs, and meeting recordings from Microsoft Teams. """ from collections import defaultdict import requests from . import constant from .microsoft_teams_client import MSTeamsClient from .utils import (extract_api_response, get_schema_fields, html_to_text, url_encode) USER_CHAT_ATTACHMENT = "User Chat Attachments" MEETING_RECORDING = "Meeting Recording" USER_CHAT_TABS = "User Chat Tabs" class MSTeamsUserMessage: """Fetches users details from the Microsoft Teams.""" def __init__(self, access_token, logger, config, local_storage): self.token = access_token self.client = MSTeamsClient(logger, self.token, config) self.logger = logger self.is_permission_sync_enabled = config.get_value("enable_document_permission") self.config = config self.object_type_to_index = config.get_value('object_type_to_index') self.local_storage = local_storage def get_attachments( self, user_id, prefix, attachment_name, attachment_id, chat_id, updated_date, ids_list, user_drive, attachment_client ): """Fetches all the attachments of a user chat :param user_id: Id of the user :param prefix: Title of the chat message :param attachment_name: Name of the attachment :param attachment_id: Id of the attachment :param chat_id: Id of chat :param updated_date: date of chat updated :param ids_list: List of ids :param user_drive: Dictionary of user id with drive id :param attachment_client: Object of Microsoft team client Returns: attachment_list: Documents to be indexed in Workplace Search """ try: item_id = None # Checking if the user_id is present in user_drive dictionary for saving the else iteration. If user_id is # present in the dictionary then we'll directly use its drive_id for fetching attachments. if user_drive.get(user_id): drive_id = list(user_drive[user_id].keys())[0] else: user_drive_response = attachment_client.get_user_chat_attachment_drive( f"{constant.GRAPH_BASE_URL}/users/{user_id}/drive" ) if user_drive_response: # Logic to append user for deletion self.local_storage.insert_document_into_doc_id_storage( ids_list, user_id, constant.USER, "", "" ) drive_id = user_drive_response["id"] user_drive[user_id] = {drive_id: None} # Logic to append user drive for deletion self.local_storage.insert_document_into_doc_id_storage( ids_list, drive_id, constant.USER_CHAT_DRIVE, user_id, "" ) if user_drive: # Check that item_id is present with the drive id or not for saving the else iteration. if user_drive.get(user_id).get(drive_id): item_id = user_drive.get(user_id).get(drive_id) else: user_root_response_data = attachment_client.get_user_chat_attachment_drive_children( f"{constant.GRAPH_BASE_URL}/drives/{drive_id}/items/root/children" ) if user_root_response_data: for child in user_root_response_data: if child["name"] == "Microsoft Teams Chat Files": item_id = child["id"] user_drive[user_id][drive_id] = item_id break attachment_list = [] if not item_id: return [] # Logic to append user drive item for deletion self.local_storage.insert_document_into_doc_id_storage( ids_list, item_id, constant.USER_CHAT_DRIVE_ITEM, drive_id, user_id ) final_attachment_url = f"{constant.GRAPH_BASE_URL}/drives/{drive_id}/items/{item_id}/children?" \ f"$filter=name eq '{url_encode(attachment_name)}'" attachment_response_data = attachment_client.get_user_chat_attachment_drive_children( final_attachment_url) if attachment_response_data: document = attachment_response_data[0] attachment_dict = {"type": USER_CHAT_ATTACHMENT} is_file = document.get("file", {}) if is_file and type(is_file) != float: mimetype = is_file.get("mimeType") if mimetype not in constant.MIMETYPES: attachment_content_response = requests.get( document.get("@microsoft.graph.downloadUrl") ) if attachment_content_response: attachment_content = extract_api_response( attachment_content_response.content ) attachment_dict["id"] = attachment_id attachment_dict["title"] = f"{prefix}-{attachment_name}" attachment_dict["body"] = attachment_content or "" attachment_dict["url"] = document.get("webUrl") attachment_dict["last_updated"] = updated_date attachment_dict["_allow_permissions"] = [] if self.is_permission_sync_enabled: attachment_dict["_allow_permissions"] = [chat_id] attachment_list.append(attachment_dict) # Logic to append user chat attachment for deletion self.local_storage.insert_document_into_doc_id_storage( ids_list, attachment_id, USER_CHAT_ATTACHMENT, item_id, drive_id, ) return attachment_list except Exception as exception: self.logger.exception( f"[Fail] Error while fetching attachments for the user chats. Error: {exception}" ) def fetch_tabs(self, chat_id, ids_list, start_time, end_time): """Fetches user chat tabs from the Microsoft Teams :param chat_id: Id of the chat :param ids_list: List of ids :param start_time: Starting time for fetching data :param end_time: Ending time for fetching data Returns: documents: Documents to be indexed in Workplace Search """ try: documents = [] tab_detail_response = self.client.get_user_chat_tabs( f"{constant.GRAPH_BASE_URL}/chats/{chat_id}/tabs", start_time, end_time, chat_id ) if tab_detail_response: tab_schema = get_schema_fields("user_tabs", self.object_type_to_index) for tab in tab_detail_response: tab_dict = {"type": USER_CHAT_TABS} for ws_field, ms_fields in tab_schema.items(): tab_dict[ws_field] = tab[ms_fields] tab_dict["url"] = tab["configuration"]["websiteUrl"] tab_dict["_allow_permissions"] = [] if self.is_permission_sync_enabled: tab_dict["_allow_permissions"] = [chat_id] documents.append(tab_dict) self.local_storage.insert_document_into_doc_id_storage( ids_list, tab["id"], USER_CHAT_TABS, chat_id, "" ) return documents except Exception as exception: self.logger.exception( f"[Fail] Error while fetching user tabs from teams. Error: {exception}" ) raise def fetch_meeting_recording(self, chat_id, chat): """Fetches meeting recording from the Microsoft Teams :param chat_id: Id of the chat :param chat: dictionary of the user chat Returns: recording_dict: Document to be indexed in Workplace Search """ if ( chat["eventDetail"] and chat["eventDetail"][ "@odata.type"] == "#microsoft.graph.callRecordingEventMessageDetail" ): url = chat["eventDetail"].get("callRecordingUrl") if url and ".sharepoint.com" in url: recording_dict = {"type": MEETING_RECORDING} recording_dict["id"] = chat["eventDetail"]["callId"] recording_dict["title"] = chat["eventDetail"][ "callRecordingDisplayName" ] recording_dict["url"] = url recording_dict["_allow_permissions"] = [] if self.is_permission_sync_enabled: recording_dict["_allow_permissions"] = [chat_id] return recording_dict def get_user_chats(self, ids_list): """Fetches user chats by calling '/Chats' api :param ids_list: List of ids Returns: member_dict: List of dictionaries containing chat id and their members documents: Documents to be indexed in Workplace Search """ self.logger.debug("Fetching the users chats") documents = [] chat_response_data = self.client.get_user_chats(f"{constant.GRAPH_BASE_URL}/chats?$expand=members") if chat_response_data: self.logger.info( "Fetched the user chat metadata. Attempting to extract the messages from the chats, " "attachments and meeting recordings.." ) # member_dict: Dictionary of members with their id for adding permissions member_dict = defaultdict(list) for chat in chat_response_data: for member in chat["members"]: display_name = member["displayName"] if display_name: member_dict[display_name].append(chat["id"]) # Logic to append chat for deletion self.local_storage.insert_document_into_doc_id_storage( ids_list, chat["id"], constant.CHATS, "", "" ) documents.append(chat) return member_dict, documents def get_user_chat_messages( self, ids_list, user_drive, chat_response_data, start_time, end_time, user_attachment_token, ): """Fetches the user chat messages from Microsoft Teams :param ids_list: List of ids :param user_drive: Dictionary of dictionary :param chat_response_data: Chats data for fetching chat messages :param start_time: Starting time for fetching data :param end_time: Ending time for fetching data :param user_attachment_token: Access token for fetching the attachments Returns: documents: Documents to be indexed in Workplace Search """ documents = [] user_schema = get_schema_fields("user_chats", self.object_type_to_index) attachment_client = MSTeamsClient( self.logger, user_attachment_token, self.config ) for val in chat_response_data: member_title = [] for member in val["members"]: display_name = member["displayName"] if display_name: member_title.append(display_name) # Logic to append chat for deletion try: chat_detail_response = self.client.get_user_chat_messages( f'{constant.GRAPH_BASE_URL}/chats/{val["id"]}/messages', start_time, end_time, val['id'] ) if chat_detail_response: for chat in chat_detail_response: if not chat["deletedDateTime"]: title = ( val.get("topic") if val.get("topic") else ",".join(member_title) ) sender = chat["from"] user_name = "" if sender and sender["user"]: user_id = sender.get("user", {}).get("id") user_name = sender.get("user", {}).get("displayName") for attachment in chat["attachments"]: name = attachment["name"] if name and attachment["contentType"] == "reference": attachment_document = self.get_attachments( user_id, title, name, attachment["id"], val["id"], chat["lastModifiedDateTime"], ids_list, user_drive, attachment_client ) if attachment_document: documents.extend(attachment_document) content = chat["body"]["content"] chat_message = html_to_text(self.logger, content) if chat_message: # Logic to append chat message for deletion self.local_storage.insert_document_into_doc_id_storage( ids_list, chat["id"], constant.USER_CHATS_MESSAGE, val["id"], "", ) user_dict = {"type": constant.USER_CHATS_MESSAGE} for ws_field, ms_fields in user_schema.items(): user_dict[ws_field] = chat[ms_fields] user_dict["title"] = title user_dict["body"] = ( f"{user_name} - {chat_message}" if user_name else chat_message ) user_dict["url"] = val["webUrl"] user_dict["_allow_permissions"] = [] if self.is_permission_sync_enabled: user_dict["_allow_permissions"] = [val["id"]] documents.append(user_dict) else: self.logger.info( f"the message for the chat {chat['id']} is empty" ) meeting_recordings = self.fetch_meeting_recording( val["id"], chat ) if meeting_recordings: documents.append(meeting_recordings) except Exception as exception: self.logger.exception( f"[Fail] Error while fetching user chats details from teams. Error: {exception}" ) raise self.logger.info( f"Fetched chats, attachments and meeting recordings metadata. Attempting to fetch tabs " f"for chat: {val['id']}" ) tabs_document = self.fetch_tabs(val["id"], ids_list, start_time, end_time) documents.extend(tabs_document) self.logger.info("Fetched the user chat tabs") return documents