ees_microsoft_teams/sync_microsoft_teams.py (90 lines of code) (raw):

# # Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one # or more contributor license agreements. Licensed under the Elastic License 2.0; # you may not use this file except in compliance with the Elastic License 2.0. # """This module allows to sync data to Elastic Enterprise Search. It's possible to run full syncs and incremental syncs with this module. """ import csv import os from . import constant from .local_storage import LocalStorage class SyncMicrosoftTeams: """Fetches the Microsoft Teams documents and its permissions and store them into queue.""" def __init__(self, indexing_type, config, logger, queue): self.logger = logger self.config = config self.objects = config.get_value("object_type_to_index") self.permission = config.get_value("enable_document_permission") self.indexing_type = indexing_type self.local_storage = LocalStorage(config) self.queue = queue def add_permissions_to_queue(self, user, roles): """This method is used to map the Microsoft Teams users to workplace search users and responsible to call the user permissions indexer method :param user: User for indexing the permissions :param roles: User roles """ rows = {} mapping_sheet_path = self.config.get_value("microsoft_teams.user_mapping") if ( mapping_sheet_path and os.path.exists(mapping_sheet_path) and os.path.getsize(mapping_sheet_path) > 0 ): with open(mapping_sheet_path, encoding="UTF-8") as file: for row in csv.reader(file): rows[row[0]] = row[1] user_name = rows.get(user, user) permission_dict = {"user": user_name, "roles": roles} self.queue.append_to_queue("permissions", permission_dict) def fetch_user_chats(self, chats_obj, ids_list): """Fetches user chats from Microsoft Teams :param chats_obj: Chats class object to fetch the chats :param ids_list: Document ids list from respective doc id file """ user_permissions, chats = chats_obj.get_user_chats(ids_list) return user_permissions, chats def fetch_user_chat_messages( self, chats_obj, ids_list, user_drive, start_time, end_time, user_attachment_token, chats ): """Fetches user chat messages and other chat objects from Microsoft Teams :param chats: List of chats to fetch its children objects :param chats_obj: Chats class object to fetch the chats :param ids_list: Document ids list from respective doc id file :param user_drive: User Drive to store user related details :param start_time: Start time for fetching the user chats data :param end_time: End time for fetching the user chats data :param user_attachment_token: Access token for fetching the user chat attachments """ documents = chats_obj.get_user_chat_messages( ids_list, user_drive, chats, start_time, end_time, user_attachment_token ) self.queue.append_to_queue(constant.USER_CHATS_MESSAGE, documents) def fetch_user_chat_messages_for_deletion( self, chats_obj, ids_list, user_drive, start_time, end_time, user_attachment_token, chats ): """Fetches user chat messages and other chat objects from Microsoft Teams for deletion :param chats: List of chats to fetch its children objects :param chats_obj: Chats class object to fetch the chats :param ids_list: Document ids list from respective doc id file :param user_drive: User Drive to store user related details :param start_time: Start time for fetching the user chats data :param end_time: End time for fetching the user chats data :param user_attachment_token: Access token for fetching the user chat attachments """ return chats_obj.get_user_chat_messages( ids_list, user_drive, chats, start_time, end_time, user_attachment_token ) def fetch_teams(self, teams_obj, ids_list): """Fetches teams from Microsoft Teams :param teams_obj: Class object to fetch teams and its objects :param ids_list: Document ids list from respective doc id file """ teams = teams_obj.get_all_teams(ids_list) if "teams" in self.objects: self.queue.append_to_queue(constant.TEAMS, teams) return teams def fetch_channels(self, teams_obj, ids_list, teams): """Fetches channels from Microsoft Teams :param teams: List of teams to fetch the channels :param teams_obj: Class object to fetch teams and its objects :param ids_list: Document ids list from respective doc id file """ channels, channel_documents = teams_obj.get_team_channels(teams, ids_list) if "channels" in self.objects: self.queue.append_to_queue(constant.CHANNELS, channel_documents) return channels def fetch_channels_for_deletion(self, teams_obj, teams): """Fetches channels from Microsoft Teams for deletion :param teams: List of teams to fetch the channels :param teams_obj: Class object to fetch teams and its objects """ channels, channel_documents = teams_obj.get_team_channels(teams, []) return [{"channels": channels, "channel_documents": channel_documents}] def fetch_channel_messages(self, teams_obj, start_time, end_time, ids_list, channels): """Fetches channel messages from Microsoft Teams :param teams_obj: Class object to fetch teams and its objects :param start_time: Start time for fetching channel messages :param end_time: End time for fetching channel messages :param ids_list: Document ids list from respective doc id file :param channels: List of channels to fetch channel messages from Microsoft Teams """ channel_message_documents = teams_obj.get_channel_messages( channels, ids_list, start_time, end_time ) self.queue.append_to_queue(constant.CHANNEL_MESSAGES, channel_message_documents) def fetch_channel_messages_for_deletion(self, teams_obj, start_time, end_time, ids_list, channels): """Fetches channel messages from Microsoft Teams for deletion :param channels: List of channels to fetch channel messages from Microsoft Teams :param teams_obj: Class object to fetch teams and its objects :param start_time: Start time for fetching channel messages :param end_time: End time for fetching channel messages :param ids_list: Document ids list from respective doc id file """ return teams_obj.get_channel_messages( channels, ids_list, start_time, end_time ) def fetch_channel_tabs(self, teams_obj, start_time, end_time, ids_list, channels): """Fetches channel tabs from Microsoft Teams :param teams_obj: Class object to fetch teams and its objects :param start_time: Start time for fetching channel tabs :param end_time: End time for fetching channel tabs :param ids_list: Document ids list from respective doc id file :param channels: List of channels to fetch channel tabs from Microsoft Teams """ tab_documents = teams_obj.get_channel_tabs( channels, ids_list, start_time, end_time ) self.queue.append_to_queue(constant.CHANNEL_TABS, tab_documents) def fetch_channel_tabs_for_deletion(self, teams_obj, start_time, end_time, ids_list, channels): """Fetches channel tabs from Microsoft Teams for deletion :param channels: List of channels to fetch channel messages and tabs from Microsoft Teams :param teams_obj: Class object to fetch teams and its objects :param start_time: Start time for fetching channel messages and tabs :param end_time: End time for fetching channel messages and tabs :param ids_list: Document ids list from respective doc id file """ return teams_obj.get_channel_tabs( channels, ids_list, start_time, end_time ) def fetch_channel_documents(self, teams_obj, start_time, end_time, ids_list, teams): """Fetches channel documents from Microsoft Teams :param teams: List of teams to fetch channels from Microsoft Teams :param teams_obj: Class object to fetch teams and its objects :param start_time: Start time for fetching channel documents :param end_time: End time for fetching channel documents :param ids_list: Document ids list from respective doc id file """ channel_documents = teams_obj.get_channel_documents( teams, ids_list, start_time, end_time ) self.queue.append_to_queue(constant.CHANNEL_DOCUMENTS, channel_documents) def fetch_channel_documents_for_deletion(self, teams_obj, start_time, end_time, ids_list, teams): """Fetches channel documents from Microsoft Teams for deletion :param teams: List of teams to fetch channels from Microsoft Teams :param teams_obj: Class object to fetch teams and its objects :param start_time: Start time for fetching channel documents :param end_time: End time for fetching channel documents :param ids_list: Document ids list from respective doc id file """ return teams_obj.get_channel_documents( teams, ids_list, start_time, end_time ) def fetch_calendars(self, calendar_obj, ids_list, start_time, end_time): """Fetches calendar events from Microsoft Teams :param calendar_obj: Class object to fetch calendar events :param ids_list: Document ids list from respective doc id file :param start_time: Start time for fetching calendar events :param end_time: End time for fetching calendar events """ calendar_permissions, documents = calendar_obj.get_calendars( ids_list, start_time, end_time ) self.queue.append_to_queue(constant.CALENDAR, documents) return calendar_permissions def sync_permissions(self, user_permissions): """Sync permissions of Microsoft Objects to Workplace Search :param user_permissions: Dictionary having the user permissions to be indexed into Workplace Search """ for user, permissions in user_permissions.items(): self.add_permissions_to_queue(user, permissions)