ees_microsoft_teams/deletion_command.py (149 lines of code) (raw):

# # Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one # or more contributor license agreements. Licensed under the Elastic License 2.0; # you may not use this file except in compliance with the Elastic License 2.0. # """This module is used to create multithreading jobs for Microsoft Teams objects. """ from . import constant from .base_command import BaseCommand from .msal_access_token import MSALAccessToken from .sync_microsoft_teams import SyncMicrosoftTeams from .utils import split_documents_into_equal_chunks, is_document_in_present_data class DeletionCommand(BaseCommand): """ This class creates the multithreading jobs for Teams, User Chats and Calendars objects """ def create_jobs_for_teams(self, thread_count, start_time, end_time, queue): """Creates jobs for deleting the teams and its children objects :param thread_count: Thread count to make partitions :param start_time: Start time for fetching the data :param end_time: End time for fetching the data :param queue: Shared queue for storing the data """ allowed_objects = ["teams", "channels", "channel_messages", "channel_tabs", "channel_documents"] storage_with_collection = self.local_storage.get_documents_from_doc_id_storage("teams") sync_ms_teams_obj = SyncMicrosoftTeams("deletion_sync", self.config, self.logger, queue) if not any(teams_object in self.config.get_value("object_type_to_index") for teams_object in allowed_objects): return self.logger.debug("Started deleting the teams and its objects data...") microsoft_teams_object = self.microsoft_team_channel_object(self.get_access_token()) try: deleted_data = storage_with_collection.get("delete_keys") or [] global_keys_documents = storage_with_collection.get("global_keys") or [] teams = microsoft_teams_object.get_all_teams([]) teams_partition_list = split_documents_into_equal_chunks( teams, thread_count ) job_documents_list = self.create_and_execute_jobs( thread_count, sync_ms_teams_obj.fetch_channels_for_deletion, (microsoft_teams_object,), teams_partition_list, ) channels, channel_index_documents = [], [] for channel_data in job_documents_list: channels.extend(channel_data["channels"]) channel_index_documents.extend(channel_data["channel_documents"]) live_data = [] delete_keys_documents = [] configuration_objects = self.config.get_value("object_type_to_index") if "teams" in configuration_objects: live_data.extend(teams) if "channels" in configuration_objects: live_data.extend(channel_index_documents) channels_partition_list = split_documents_into_equal_chunks(channels, thread_count) if "channel_messages" in configuration_objects: channel_messages = self.create_and_execute_jobs( thread_count, sync_ms_teams_obj.fetch_channel_messages_for_deletion, (microsoft_teams_object, start_time, end_time, []), channels_partition_list ) live_data.extend(channel_messages) if "channel_tabs" in configuration_objects: channel_tabs = self.create_and_execute_jobs( thread_count, sync_ms_teams_obj.fetch_channel_tabs_for_deletion, (microsoft_teams_object, start_time, end_time, []), channels_partition_list ) live_data.extend(channel_tabs) if "channel_documents" in configuration_objects: channel_documents = self.create_and_execute_jobs( thread_count, sync_ms_teams_obj.fetch_channel_documents_for_deletion, (microsoft_teams_object, start_time, end_time, []), teams_partition_list ) live_data.extend(channel_documents) self.remove_deleted_documents_from_global_keys( live_data, deleted_data, delete_keys_documents, global_keys_documents, "", "" ) queue.append_to_queue("deletion", list(delete_keys_documents)) storage_with_collection["global_keys"] = list(global_keys_documents) storage_with_collection["delete_keys"] = [] self.local_storage.update_storage(storage_with_collection, "teams") except Exception as exception: self.logger.exception(f"Error while deleting the teams or it's objects data. Error: {exception}") self.logger.info("Completed deleting of teams and it's objects data to the Workplace Search") def create_jobs_for_user_chats(self, thread_count, start_time, end_time, queue): """Creates jobs for deleting the user chats and its children objects :param thread_count: Thread count to make partitions :param start_time: Start time for fetching the data :param end_time: End time for fetching the data :param queue: Shared queue for storing the data """ if "user_chats" not in self.config.get_value('object_type_to_index'): return self.logger.debug("Started deletion the user chats, meeting chats, and meeting recordings...") user_chat_object = self.microsoft_user_chats_object(self.get_access_token()) storage_with_collection = self.local_storage.get_documents_from_doc_id_storage("user_chats") sync_ms_teams_obj = SyncMicrosoftTeams("deletion_sync", self.config, self.logger, queue) try: user_drive = {} _, chats = user_chat_object.get_user_chats(ids_list=[]) chats_partition_list = split_documents_into_equal_chunks(chats, thread_count) user_attachment_token = MSALAccessToken(self.logger, self.config) user_attachment_token = user_attachment_token.get_token( is_acquire_for_client=True ) chat_messages_documents = self.create_and_execute_jobs( thread_count, sync_ms_teams_obj.fetch_user_chat_messages_for_deletion, (user_chat_object, [], user_drive, start_time, end_time, user_attachment_token), chats_partition_list ) deleted_data = storage_with_collection.get("delete_keys") or [] global_keys_documents = storage_with_collection.get("global_keys") or [] delete_keys_documents = [] self.remove_deleted_documents_from_global_keys( chat_messages_documents, deleted_data, delete_keys_documents, global_keys_documents, "", "" ) queue.append_to_queue("deletion", list(delete_keys_documents)) storage_with_collection["global_keys"] = list(global_keys_documents) storage_with_collection["delete_keys"] = [] self.local_storage.update_storage(storage_with_collection, "user_chats") except Exception as exception: self.logger.exception( f"Error while deleting user chats, meeting chats and meeting recordings. Error: " f"{exception}" ) self.logger.info("Completed deleting the user chats, meeting chats and meeting recordings") def create_jobs_for_calendars(self, start_time, end_time, queue): """Creates jobs for deleting the calendar events :param start_time: Start time for fetching the data :param end_time: End time for fetching the data :param queue: Shared queue for storing the data """ if "calendar" not in self.config.get_value("object_type_to_index"): return self.logger.debug("Started deleting the calendar events from Microsoft Teams...") storage_with_collection = self.local_storage.get_documents_from_doc_id_storage("calendar") try: calendar_object = self.microsoft_calendar_object(self.get_access_token(is_acquire_for_client=True)) _, documents = calendar_object.get_calendars(ids_list=[], start_time=start_time, end_time=end_time) deleted_data = storage_with_collection.get("delete_keys") or [] global_keys_documents = storage_with_collection.get("global_keys") or [] delete_keys_documents = [] self.remove_deleted_documents_from_global_keys( documents, deleted_data, delete_keys_documents, global_keys_documents, "", "" ) queue.append_to_queue("deletion", list(delete_keys_documents)) storage_with_collection["global_keys"] = list(global_keys_documents) storage_with_collection["delete_keys"] = [] self.local_storage.update_storage(storage_with_collection, "calendar") except Exception as exception: self.logger.exception(f"Error while deleting the calendars. Error: {exception}") self.logger.info("Completed deleting the calendar events from Microsoft Teams") def remove_deleted_documents_from_global_keys( self, live_documents, list_ids_documents, deleted_documents, global_keys_documents, parent_id, super_parent_id ): """ Updates the local storage with removing the keys that were deleted from Microsoft Teams :param live_documents: Documents present in Microsoft Teams :param list_ids_documents: Documents present in respective doc_ids.json files :param deleted_documents: Document list that were deleted from Microsoft Teams :param global_keys_documents: Document list that are present in doc_ids.json :param parent_id: Parent id of the document :param super_parent_id: Super parent id of the document """ parent_items = list(filter(lambda seq: is_document_in_present_data( seq, parent_id, "parent_id"), list_ids_documents)) for item in parent_items: item_id = item["id"] parent_id = item["parent_id"] super_parent_id = item["super_parent_id"] type = item["type"] present_items = list(filter( lambda seq: is_document_in_present_data(seq, item_id, "id"), live_documents)) if(len(present_items) == 0 and type not in [ constant.CHATS, constant.USER, constant.USER_CHAT_DRIVE, constant.USER_CHAT_DRIVE_ITEM, constant.CHANNEL_DRIVE, constant.CHANNEL_ROOT, constant.CHANNEL_DRIVE_ITEM]): deleted_documents.append(item_id) if item in global_keys_documents: global_keys_documents.remove(item) # Recursively call the function self.remove_deleted_documents_from_global_keys( live_documents, list_ids_documents, deleted_documents, global_keys_documents, item_id, super_parent_id )