ees_microsoft_teams/microsoft_teams_channels.py (325 lines of code) (raw):
#
# Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
# or more contributor license agreements. Licensed under the Elastic License 2.0;
# you may not use this file except in compliance with the Elastic License 2.0.
#
"""This module collects all the teams and Channels detail from Microsoft Teams.
"""
import dateparser
import requests
from iteration_utilities import unique_everseen
from requests.exceptions import RequestException
from tika.tika import TikaException
from . import constant
from .microsoft_teams_client import MSTeamsClient
from .utils import (get_data_from_http_response, get_schema_fields, extract_api_response,
html_to_text, url_decode)
MEETING_DATETIME_FORMAT = "%Y-%m-%dT%H:%M:%S.%fZ"
CHANNEL_MEETINGS = "Channel Meetings"
class MSTeamsChannels:
"""This class fetches all the teams and channels data from Microsoft Teams.
"""
def __init__(self, access_token, logger, config, local_storage):
self.access_token = access_token
self.client = MSTeamsClient(logger, self.access_token, config)
self.logger = logger
self.object_type_to_index = config.get_value('object_type_to_index')
self.is_permission_sync_enabled = config.get_value("enable_document_permission")
self.local_storage = local_storage
def get_all_teams(self, ids_list):
""" Fetches all the teams from Microsoft Teams
:param ids_list: Shared storage for storing the document ids
Returns:
teams_details: List of dictionaries containing the team details
"""
self.logger.info("Fetching teams from Microsoft Teams...")
documents = []
response = self.client.get_teams(next_url=f"{constant.GRAPH_BASE_URL}/groups")
if not response:
return []
team_schema = get_schema_fields("teams", self.object_type_to_index)
for team in response:
team_data = {"type": constant.TEAMS}
# Logic to append teams for deletion
self.local_storage.insert_document_into_doc_id_storage(
ids_list=ids_list, id=team["id"], type=constant.TEAMS
)
for workplace_search_field, microsoft_teams_field in team_schema.items():
team_data[workplace_search_field] = team[microsoft_teams_field]
team_data["_allow_permissions"] = []
if self.is_permission_sync_enabled:
team_data["_allow_permissions"] = [team["id"]]
documents.append(team_data)
return documents
def get_team_members(self):
""" Fetches the team members from Microsoft Teams
Returns:
member_list: List containing all the team members
"""
self.logger.info("Fetching team members from Microsoft Teams")
member_list = {}
response = self.client.get_teams(next_url=f"{constant.GRAPH_BASE_URL}/groups")
if not response:
return member_list
for team in response:
self.logger.info(f"Fetching members for the team: {team['displayName']}")
team_id = team["id"]
response = self.client.get_teams(
next_url=f"{constant.GRAPH_BASE_URL}/teams/{team_id}/members"
)
if not response:
continue
for member in response:
display_name = member["displayName"]
if member_list.get(display_name):
member_list[display_name].append(team_id)
else:
member_list[display_name] = [team_id]
return member_list
def get_team_channels(self, teams, ids_list):
""" Fetches all the team channels from the Microsoft Teams
:param teams: List of dictionaries containing the team details
:param ids_list: Shared storage for storing the document ids
Returns:
documents_with_teams: List of dictionaries containing the team channel details
documents: Documents to be indexed in Workplace Search
"""
documents = []
documents_with_teams = []
for team in teams:
team_id = team["id"]
team_name = team["title"]
self.logger.info(f"Fetching channels for team: {team_name}")
response = self.client.get_channels(
next_url=f"{constant.GRAPH_BASE_URL}/teams/{team_id}/channels"
)
if not response:
continue
channel_schema = get_schema_fields("channels", self.object_type_to_index)
channels_by_team = {team_id: []}
for channel in response:
# Logic to append channels for deletion
self.local_storage.insert_document_into_doc_id_storage(
ids_list, channel["id"], constant.CHANNELS, team_id, ""
)
channel_data = {"type": constant.CHANNELS}
for workplace_search_field, microsoft_teams_field in channel_schema.items():
channel_data[workplace_search_field] = channel[microsoft_teams_field]
channel_data["_allow_permissions"] = []
if self.is_permission_sync_enabled:
channel_data["_allow_permissions"] = [team_id]
documents.append(channel_data)
channels_by_team[team_id].append(channel_data)
documents_with_teams.append(channels_by_team)
return documents_with_teams, documents
def get_channel_messages(self, team_channels_list, ids_list, start_time, end_time):
""" Fetches all the channel messages from the Microsoft Teams
:param team_channels_list: List of dictionaries containing team_id as a key and
channels of that team as a value
:param ids_list: Shared storage for storing the document ids
:param start_time: Starting time for fetching data
:param end_time: Ending time for fetching data
Returns:
documents: List of dictionaries containing the channel messages details
"""
self.logger.debug(
f"Fetching channel messages for the interval of start time: {start_time} and end time: {end_time}.")
documents = []
for team_channel_map in team_channels_list:
for team_id, channel_list in team_channel_map.items():
for channel in channel_list:
channel_id = channel["id"]
channel_name = channel["title"]
self.logger.info(f"Fetching the channel messages for channel: {channel_name}")
response = self.client.get_channel_messages(
next_url=f"{constant.GRAPH_BASE_URL}/teams/{team_id}/channels/{channel_id}/messages",
channel_name=channel_name, start_time=start_time, end_time=end_time)
if response:
documents = self.get_channel_messages_documents(
response, channel, ids_list, team_id, start_time, end_time, documents)
return documents
def get_channel_messages_documents(
self, message_response_data, channel, ids_list, team_id, start_time, end_time, documents
):
"""Prepares a Workplace Search document for channel messages to be indexed
:param message_response_data: Response data to prepare a workplace search document
:param channel: Channel for fetching the channel messages
:param ids_list: Shared storage for storing the document ids
:param start_time: Starting time for fetching data
:param end_time: Ending time for fetching data
:param documents: Document to be indexed into the Workplace Search
Returns:
documents: Document to be indexed into the Workplace Search
"""
channel_id = channel["id"]
channel_name = channel["title"]
channel_message_schema = get_schema_fields("channel_messages", self.object_type_to_index)
for message in message_response_data:
message_data = {"type": constant.CHANNEL_MESSAGES}
if not message["deletedDateTime"]:
content = html_to_text(self.logger, message["body"]["content"])
attachments = message.get("attachments")
is_meeting = message.get("eventDetail") and message.get(
"eventDetail", {}).get("callEventType")
if content or attachments or is_meeting:
if content or attachments:
self.logger.info("Extracting html/text messages...")
sender = message["from"]["user"]["displayName"]
attachment_names = self.get_attachment_names(attachments)
message_data["title"] = channel_name
# If the message has attachments in it, set the message body format to
# `sender - attachments`
message_data["body"] = f"{sender} - {attachment_names}\n"
if content and attachments:
# If the message has both content and attachments, set the message
# body format to `sender - attachments - message`
message_data["body"] += f"Message: {content}\n"
elif content:
# If the message has just content and no attachments, replace the
# message body format with `sender - message`
message_data["body"] = f"{sender} - {content}"
else:
self.logger.info(
f"Extracting meeting details for channel: {channel['title']} from "
"Microsoft Teams...")
message_data["type"] = CHANNEL_MEETINGS
meeting_time = message['createdDateTime']
formatted_datetime = dateparser.parse(meeting_time).strftime(
"%d %b, %Y at %H:%M:%S")
message_data["title"] = f"{channel['title']} - Meeting On "\
f"{formatted_datetime}"
# Logic to append channel messages for deletion
self.local_storage.insert_document_into_doc_id_storage(
ids_list, message["id"],
constant.CHANNEL_MESSAGES, channel_id, team_id)
for workplace_search_field, microsoft_teams_field in channel_message_schema.items():
message_data[workplace_search_field] = message[microsoft_teams_field]
message_data["_allow_permissions"] = []
if self.is_permission_sync_enabled:
message_data["_allow_permissions"] = [team_id]
replies_data = self.get_message_replies(
team_id, channel_id, message['id'], start_time, end_time)
if replies_data:
if attachments:
message_data["body"] += f"Attachment Replies:\n{replies_data}"
elif content:
message_data["body"] = f"{sender} - {content}\nReplies:\n"\
f"{replies_data}"
else:
message_data["body"] = f"Meeting Messages:\n{replies_data}"
if message_data:
documents.append(message_data)
return documents
def get_attachment_names(self, attachments):
"""Convert multiple attachment names into comma separated name
:param attachments: Attachment object for fetching the attachment names
Returns:
attachment_names: List of channel attachments
"""
attachment_list = []
for attachment in attachments:
if attachment["contentType"] == "tabReference":
attachment["name"] = url_decode(attachment["name"])
attachment_list.append(attachment["name"])
attachment_names = ", ".join(attachment_list)
return attachment_names
def get_message_replies(self, team_id, channel_id, message_id, start_time, end_time):
""" Fetches the replies of a specific channel message.
:param team_id: Team id
:param channel_id: Channel id
:param message_id: Parent message id
:param start_time: Starting time for fetching data
:param end_time: Ending time for fetching data
Returns:
message_body: List of message replies
"""
self.logger.info(f"Fetching message replies for message id: {message_id}...")
replies_list = []
replies_url = f"{constant.GRAPH_BASE_URL}/teams/{team_id}/channels/{channel_id}/messages/{message_id}/replies"
response = self.client.get_channel_messages(
next_url=replies_url,
start_time=start_time,
end_time=end_time,
is_message_replies=True
)
parsed_response = get_data_from_http_response(
logger=self.logger,
response=response,
error_message="Could not fetch the channel message replies.",
exception_message="Error while fetching the channel message replies."
)
if not parsed_response:
return ""
for reply in parsed_response:
reply_content = html_to_text(self.logger, reply["body"]["content"])
if reply_content:
sender = reply["from"]["user"]["displayName"]
replies_list.append(f"{sender} - {reply_content}")
message_body = "\n".join(reply for reply in replies_list)
return message_body
def get_channel_tabs(self, team_channels_list, ids_list, start_time, end_time):
""" Fetches the channel tabs from the Microsoft Teams.
:param team_channels_list: List of dictionaries containing team_id as a key and
channels of that team as a value
:param ids_list: Shared storage for storing the document ids
:param start_time: Starting time for fetching data
:param end_time: Ending time for fetching data
Returns:
documents: Documents to be indexed in Workplace Search
"""
self.logger.debug(
f"Fetching channel tabs for the interval of start time: {start_time} and end time: {end_time}.")
documents = []
tabs_schema = get_schema_fields("channel_tabs", self.object_type_to_index)
for team_channel_map in team_channels_list:
for team_id, channel_list in team_channel_map.items():
for channel in channel_list:
channel_id = channel["id"]
channel_name = channel['title']
self.logger.info(f"Fetching the tabs for channel: {channel_name}")
response = self.client.get_channel_tabs(
next_url=f"{constant.GRAPH_BASE_URL}/teams/{team_id}/channels/{channel_id}/tabs",
start_time=start_time,
end_time=end_time,
channel_name=channel_name
)
if not response:
continue
for tab in response:
# Logic to append channel tabs for deletion
self.local_storage.insert_document_into_doc_id_storage(
ids_list, tab["id"], constant.CHANNEL_TABS, channel_id, team_id)
tabs_data = {"type": constant.CHANNEL_TABS}
for workplace_search_field, microsoft_teams_field in tabs_schema.items():
if workplace_search_field == "title":
tabs_data[workplace_search_field] = f"{channel_name}" \
f"-{tab[microsoft_teams_field]}"
else:
tabs_data[workplace_search_field] = tab[microsoft_teams_field]
tabs_data["_allow_permissions"] = []
if self.is_permission_sync_enabled:
tabs_data["_allow_permissions"] = [team_id]
documents.append(tabs_data)
return documents
def get_channel_documents(self, teams, ids_list, start_time, end_time):
""" Fetches all the channel documents from the Microsoft Teams
:param teams: List of dictionaries containing the team details
:param ids_list: Shared storage for storing the document ids
:param start_time: Starting time for fetching data
:param end_time: Ending time for fetching data
Returns:
documents: Documents to be indexed in Workplace Search
"""
documents = []
self.logger.debug(
f"Fetching channel documents for the interval of start time: {start_time} and end time: {end_time}."
)
for team in teams:
team_id = team["id"]
team_name = team["title"]
self.logger.info(f"Fetching drives for team: {team_name}")
drive_response = self.client.get_channel_drives_and_children(
next_url=f"{constant.GRAPH_BASE_URL}/groups/{team_id}/drives", object_type=constant.DRIVE)
drive_response_data = get_data_from_http_response(
self.logger, drive_response,
f"Could not fetch channels document drives for team:{team_id} Error: {drive_response}",
f"Error while fetching channels document drives for team: {team_id} Error: {drive_response}")
if drive_response_data:
for drive in drive_response_data:
drive_id = drive["id"]
self.logger.info(f"Fetching root for drive: {drive['name']}")
# Logic to append team drives ids for deletion
self.local_storage.insert_document_into_doc_id_storage(ids_list, drive_id, constant.CHANNEL_DRIVE,
team_id, "")
root_response = self.client.get(
url=f"{constant.GRAPH_BASE_URL}/groups/{team_id}/drives/{drive_id}/root",
object_type=constant.ROOT)
if root_response:
root_id = root_response["id"]
self.logger.info(f"Fetching channel drives for root: {root_response['name']}")
# Logic to append drive roots ids for deletion
self.local_storage.insert_document_into_doc_id_storage(
ids_list, root_id, constant.CHANNEL_ROOT, drive_id, team_id)
children_response = self.client.get_channel_drives_and_children(
next_url=f"{constant.GRAPH_BASE_URL}/groups/{team_id}/drives/{drive_id}/items/"
f"{root_id}/children", object_type=constant.DRIVE)
children_response_data = get_data_from_http_response(
self.logger, children_response,
f"Could not fetch channels document drive items for team:{team_id} "
f"Error: {children_response}",
f"Error while fetching channels document drive items for team: {team_id} "
f"Error: {children_response}")
if children_response_data:
document_schema = get_schema_fields("channel_documents", self.object_type_to_index)
for child in children_response_data:
# Logic to append drive item ids for deletion
self.local_storage.insert_document_into_doc_id_storage(
ids_list, child["id"], constant.CHANNEL_DRIVE_ITEM, root_id, drive_id)
folder_documents = self.get_folder_documents(
team_id, drive_id, child["id"],
document_schema, [],
ids_list, child["id"],
start_time, end_time, team_name)
if folder_documents:
documents.extend(folder_documents)
return list(unique_everseen(documents))
def get_folder_documents(
self, team_id, drive_id, document_id, schema, documents, ids_list, parent_file_id, start_time,
end_time, team_name):
""" Fetches the files from the folder recursively
:param team_id: Team id
:param drive_id: Drive id
:param document_id: Folder id
:param schema: Schema for workplace fields and Microsoft Teams fields
:param documents: Document id storage
:param ids_list: Shared storage for storing the document ids
:param parent_file_id: Parent document id of current file/folder
:param start_time: Starting time for fetching data
:param end_time: Ending time for fetching data
:param team_name: Team name for log message
Returns:
documents: list of documents containing the channel documents details
"""
folder_files_url = f"{constant.GRAPH_BASE_URL}/groups/{team_id}/drives/{drive_id}/items/{document_id}/children"
folder_files_response = self.client.get_channel_documents(
next_url=folder_files_url, start_time=start_time, end_time=end_time,
object_type=constant.CHANNEL_DOCUMENTS, team_name=team_name)
if not folder_files_response:
return documents
for document in folder_files_response:
# Logic to append recursive files/folders for deletion
self.local_storage.insert_document_into_doc_id_storage(
ids_list, document["id"], constant.CHANNEL_DOCUMENTS, document_id, parent_file_id)
document_data = {"type": constant.CHANNEL_DOCUMENTS}
if document.get("folder") and type(document.get("folder")) != float:
self.get_folder_documents(
team_id, drive_id, document["id"],
schema, documents, ids_list, document_id, start_time, end_time, team_name)
for workplace_search_field, microsoft_teams_filed in schema.items():
document_data[workplace_search_field] = document[microsoft_teams_filed]
document_data["_allow_permissions"] = []
if self.is_permission_sync_enabled:
document_data["_allow_permissions"] = [team_id]
document_data["body"] = self.get_attachment_content(document)
documents.append(document_data)
return documents
def get_attachment_content(self, document):
""" This function is used to fetch and extract the channel document from download URL
:param document: document that contains the details of channel document
Returns:
attachment_content: content of the attachment
"""
is_file = document.get("file", {})
# validate if file is exractable and not null
if is_file and type(is_file) != float:
mimetype = is_file.get("mimeType")
if mimetype not in constant.MIMETYPES:
download_url = document.get("@microsoft.graph.downloadUrl")
try:
attachment_content_response = requests.get(download_url)
if attachment_content_response:
attachment_content = None
try:
self.logger.info(f"Extracting the contents of {document.get('name')}.")
attachment_content = extract_api_response(attachment_content_response.content)
except TikaException as exception:
self.logger.exception(
f"Error while extracting contents of {document['name']} via Tika Parser. Error: "
f"{exception}")
return attachment_content
except RequestException as exception:
self.logger.exception(
f"Error while downloading the channel document from download URL: {download_url}. Error: "
f"{exception}")
raise