ees_microsoft_teams/ingest_command.py (192 lines of code) (raw):

# # Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one # or more contributor license agreements. Licensed under the Elastic License 2.0; # you may not use this file except in compliance with the Elastic License 2.0. # """This module is used to create multithreading jobs for Microsoft Teams objects. """ from .base_command import BaseCommand from .msal_access_token import MSALAccessToken from .utils import split_documents_into_equal_chunks class IngestCommand(BaseCommand): """ This class creates the multithreading jobs for Teams, User Chats and Calendars objects """ def create_jobs_for_teams( self, indexing_type, sync_microsoft_teams, thread_count, start_time, end_time, queue, ): """Creates jobs for fetching the teams and its children objects :param indexing_type: The type of the indexing i.e. Full or Incremental :param sync_microsoft_teams: Object for fetching the Microsoft Teams object :param thread_count: Thread count to make partitions :param start_time: Start time for fetching the data :param end_time: End time for fetching the data :param queue: Shared queue for storing the data """ allowed_objects = [ "teams", "channels", "channel_messages", "channel_tabs", "channel_documents", ] if not any(teams_object in self.config.get_value("object_type_to_index") for teams_object in allowed_objects): return storage_with_collection = self.local_storage.get_documents_from_doc_id_storage( "teams" ) ids_list = storage_with_collection.get("global_keys", []) self.logger.debug("Started fetching the teams and its objects data...") microsoft_teams_object = self.microsoft_team_channel_object( self.get_access_token() ) try: if self.config.get_value("enable_document_permission"): user_permissions = microsoft_teams_object.get_team_members() sync_microsoft_teams.sync_permissions(user_permissions) teams = sync_microsoft_teams.fetch_teams(microsoft_teams_object, ids_list) configuration_objects = self.config.get_value("object_type_to_index") teams_partition_list = split_documents_into_equal_chunks( teams, thread_count ) channels = self.create_and_execute_jobs( thread_count, sync_microsoft_teams.fetch_channels, ( microsoft_teams_object, ids_list ), teams_partition_list, ) channels_partition_list = split_documents_into_equal_chunks( channels, thread_count ) if "channel_messages" in configuration_objects: self.create_and_execute_jobs( thread_count, sync_microsoft_teams.fetch_channel_messages, ( microsoft_teams_object, start_time, end_time, ids_list ), channels_partition_list, ) if "channel_tabs" in configuration_objects: self.create_and_execute_jobs( thread_count, sync_microsoft_teams.fetch_channel_tabs, ( microsoft_teams_object, start_time, end_time, ids_list ), channels_partition_list, ) if "channel_documents" in configuration_objects: self.create_and_execute_jobs( thread_count, sync_microsoft_teams.fetch_channel_documents, ( microsoft_teams_object, start_time, end_time, ids_list ), teams_partition_list, ) storage_with_collection["global_keys"] = list(ids_list) self.local_storage.update_storage( storage_with_collection, "teams" ) self.logger.debug("Saving the checkpoint for Teams and its objects") queue.put_checkpoint("teams", end_time, indexing_type) except Exception as exception: self.logger.exception( f"Error while fetching the teams or it's objects data. Error: {exception}" ) self.logger.info( "Completed fetching of teams and it's objects data to the Workplace Search" ) def create_jobs_for_user_chats( self, indexing_type, sync_microsoft_teams, thread_count, start_time, end_time, queue, ): """Creates jobs for fetching the user chats and its children objects :param indexing_type: The type of the indexing i.e. Full or Incremental :param sync_microsoft_teams: Object for fetching the Microsoft Teams object :param thread_count: Thread count to make partitions :param start_time: Start time for fetching the data :param end_time: End time for fetching the data :param queue: Shared queue for storing the data """ if "user_chats" not in self.config.get_value("object_type_to_index"): return self.logger.debug( "Started fetching the user chats, meeting chats, and meeting recordings..." ) user_chat_object = self.microsoft_user_chats_object( self.get_access_token() ) storage_with_collection = self.local_storage.get_documents_from_doc_id_storage( "user_chats" ) ids_list = storage_with_collection.get("global_keys", []) try: user_permissions, chats = sync_microsoft_teams.fetch_user_chats( user_chat_object, ids_list ) if self.config.get_value("enable_document_permission"): sync_microsoft_teams.sync_permissions(user_permissions) chats_partition_list = split_documents_into_equal_chunks( chats, thread_count ) user_attachment_token = MSALAccessToken(self.logger, self.config) user_attachment_token = user_attachment_token.get_token( is_acquire_for_client=True ) user_drive = {} self.create_and_execute_jobs( thread_count, sync_microsoft_teams.fetch_user_chat_messages, ( user_chat_object, ids_list, user_drive, start_time, end_time, user_attachment_token, ), chats_partition_list, ) storage_with_collection["global_keys"] = list(ids_list) self.local_storage.update_storage( storage_with_collection, "user_chats" ) self.logger.debug("Saving the checkpoint for User Chats") queue.put_checkpoint("user_chats", end_time, indexing_type) except Exception as exception: self.logger.exception( f"Error while indexing user chats, meeting chats and meeting recordings. Error: " f"{exception}" ) self.logger.info( "Completed fetching the user chats, meeting chats and meeting recordings" ) def create_jobs_for_calendars( self, indexing_type, sync_microsoft_teams, start_time, end_time, queue ): """Creates jobs for fetching the calendar events :param indexing_type: The type of the indexing i.e. Full or Incremental :param sync_microsoft_teams: Object for fetching the Microsoft Teams object :param start_time: Start time for fetching the data :param end_time: End time for fetching the data :param queue: Shared queue for storing the data """ self.logger.debug("Started fetching the calendar events from Microsoft Teams...") if "calendar" not in self.config.get_value("object_type_to_index"): return storage_with_collection = self.local_storage.get_documents_from_doc_id_storage("calendar") ids_list = storage_with_collection.get("global_keys", []) try: calendar_object = self.microsoft_calendar_object( self.get_access_token(is_acquire_for_client=True) ) calendar_permissions = sync_microsoft_teams.fetch_calendars( calendar_object, ids_list, start_time, end_time ) if self.config.get_value("enable_document_permission"): sync_microsoft_teams.sync_permissions(calendar_permissions) storage_with_collection["global_keys"] = list(ids_list) self.local_storage.update_storage( storage_with_collection, "calendar" ) self.logger.debug("Saving the checkpoint for Calendars") queue.put_checkpoint("calendar", end_time, indexing_type) except Exception as exception: self.logger.exception( f"Error while fetching the calendars. Error: {exception}" ) self.logger.info( "Completed fetching the calendar meetings" )