ees_microsoft_outlook/base_command.py (216 lines of code) (raw):

# # Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one # or more contributor license agreements. Licensed under the Elastic License 2.0; # you may not use this file except in compliance with the Elastic License 2.0. # """Module contains a base command interface. Connector can run multiple commands such as full-sync, incremental-sync, etc. This module provides convenience interface defining the shared objects and methods that will can be used by commands.""" import logging # For Python>=3.8 cached_property should be imported from functools, # and for the prior versions it should be imported from cached_property try: from functools import cached_property except ImportError: from cached_property import cached_property from concurrent.futures import ThreadPoolExecutor, as_completed from . import constant from .configuration import Configuration from .enterprise_search_wrapper import EnterpriseSearchWrapper from .local_storage import LocalStorage from .microsoft_outlook_calendar import MicrosoftOutlookCalendar from .microsoft_outlook_contacts import MicrosoftOutlookContacts from .microsoft_outlook_mails import MicrosoftOutlookMails from .microsoft_outlook_tasks import MicrosoftOutlookTasks from .utils import split_date_range_into_chunks class BaseCommand: """Base interface for all module commands. Inherit from it and implement 'execute' method, then add code to cli.py to register this command.""" def __init__(self, args): self.args = args def execute(self): """Run the command. This method is overridden by actual commands with logic that is specific to each command implementing it.""" raise NotImplementedError @cached_property def logger(self): """Get the logger instance for the running command. log level will be determined by the configuration setting log_level. """ log_level = self.config.get_value("log_level") logger = logging.getLogger(__name__) logger.propagate = True logger.setLevel(log_level) handler = logging.StreamHandler() formatter = logging.Formatter( "%(asctime)s %(levelname)s Thread[%(thread)s]: %(message)s" ) handler.setFormatter(formatter) # Uncomment the following lines to output logs in ECS-compatible format # formatter = ecs_logging.StdlibFormatter() # handler.setFormatter(formatter) handler.setLevel(log_level) logger.addHandler(handler) return logger @cached_property def workplace_search_custom_client(self): """Get the Workplace Search custom client instance for the running command.""" return EnterpriseSearchWrapper(self.logger, self.config, self.args) @cached_property def config(self): """Get the configuration for the connector for the running command.""" file_name = self.args.config_file return Configuration(file_name) @cached_property def local_storage(self): """Get the object for local storage to fetch and update ids stored locally""" return LocalStorage(self.logger) @cached_property def microsoft_outlook_mail_object(self): """Get the object for fetching the mails related data""" return MicrosoftOutlookMails(self.logger, self.config) @cached_property def microsoft_outlook_calendar_object(self): """Get the object for fetching the calendars related data""" return MicrosoftOutlookCalendar(self.logger, self.config) @cached_property def microsoft_outlook_contact_object(self): """Get the object for fetching the contacts related data""" return MicrosoftOutlookContacts(self.logger, self.config) @cached_property def microsoft_outlook_task_object(self): """Get the object for fetching the tasks related data""" return MicrosoftOutlookTasks(self.logger, self.config) def create_jobs(self, thread_count, func, args, iterable_list): """Creates a thread pool of given number of thread count :param thread_count: Total number of threads to be spawned :param func: The target function on which the async calls would be made :param args: Arguments for the targeted function :param iterable_list: list to iterate over and create thread """ # If iterable_list is present, then iterate over the list and pass each list element # as an argument to the async function, else iterate over number of threads configured if iterable_list: documents = [] with ThreadPoolExecutor(max_workers=thread_count) as executor: future_to_path = { executor.submit(func, *args, *list_element): list_element for list_element in iterable_list } for future in as_completed(future_to_path): try: if future.result(): documents.extend(future.result()) except Exception as exception: self.logger.exception( f"Error while fetching the data from Microsoft Outlook. Error {exception}" ) return documents else: with ThreadPoolExecutor(max_workers=thread_count) as executor: for _ in range(thread_count): executor.submit(func) def create_jobs_for_mails( self, indexing_type, sync_microsoft_outlook, thread_count, users_accounts, time_range_list, end_time, queue, ): """Create job for fetching the mails :param indexing_type: The type of the indexing i.e. Full or Incremental :param sync_microsoft_outlook: Object of SyncMicrosoftOutlook :param thread_count: Thread count to make partitions :param users_accounts: List of users account :param time_range_list: List of time range for fetching the data :param end_time: End time for setting checkpoint :param queue: Shared queue for storing the data """ if constant.MAILS_OBJECT.lower() not in self.config.get_value("objects"): self.logger.info( "Mails are not getting indexed because user has excluded from configuration file" ) return self.logger.debug("Started fetching the mails") ids_list = [] storage_with_collection = self.local_storage.get_storage_with_collection( self.local_storage, constant.MAIL_DELETION_PATH ) ids_list = storage_with_collection.get("global_keys") self.create_jobs( thread_count, sync_microsoft_outlook.fetch_mails, (ids_list, users_accounts, self.microsoft_outlook_mail_object), time_range_list, ) storage_with_collection["global_keys"] = list(ids_list) self.local_storage.update_storage( storage_with_collection, constant.MAIL_DELETION_PATH ) queue.put_checkpoint(constant.MAILS_OBJECT.lower(), end_time, indexing_type) def create_jobs_for_calendar( self, indexing_type, sync_microsoft_outlook, thread_count, users_accounts, time_range_list, end_time, queue, ): """Create job for fetching the calendars :param indexing_type: The type of the indexing i.e. Full or Incremental :param sync_microsoft_outlook: Object of SyncMicrosoftOutlook :param thread_count: Thread count to make partitions :param users_accounts: List of users account :param time_range_list: List of time range for fetching the data :param end_time: End time for setting checkpoint :param queue: Shared queue for storing the data """ if constant.CALENDARS_OBJECT.lower() not in self.config.get_value("objects"): self.logger.info( "Calendar events are not getting indexed because user has excluded from configuration file" ) return self.logger.debug("Started fetching the calendar") ids_list = [] storage_with_collection = self.local_storage.get_storage_with_collection( self.local_storage, constant.CALENDAR_DELETION_PATH ) ids_list = storage_with_collection.get("global_keys") self.create_jobs( thread_count, sync_microsoft_outlook.fetch_calendar, (ids_list, users_accounts, self.microsoft_outlook_calendar_object), time_range_list, ) storage_with_collection["global_keys"] = list(ids_list) self.local_storage.update_storage( storage_with_collection, constant.CALENDAR_DELETION_PATH ) queue.put_checkpoint(constant.CALENDARS_OBJECT.lower(), end_time, indexing_type) def create_jobs_for_contacts( self, indexing_type, sync_microsoft_outlook, thread_count, users_accounts, time_range_list, end_time, queue, ): """Create job for fetching the contacts :param indexing_type: The type of the indexing i.e. Full or Incremental :param sync_microsoft_outlook: Object of SyncMicrosoftOutlook :param thread_count: Thread count to make partitions :param users_accounts: List of users account :param time_range_list: List of time range for fetching the data :param end_time: End time for setting checkpoint :param queue: Shared queue for storing the data """ if constant.CONTACTS_OBJECT.lower() not in self.config.get_value("objects"): self.logger.info( "Contacts are not getting indexed because user has excluded from configuration file" ) return self.logger.debug("Started fetching the contacts") ids_list = [] storage_with_collection = self.local_storage.get_storage_with_collection( self.local_storage, constant.CONTACT_DELETION_PATH ) ids_list = storage_with_collection.get("global_keys") self.create_jobs( thread_count, sync_microsoft_outlook.fetch_contacts, (ids_list, users_accounts, self.microsoft_outlook_contact_object), time_range_list, ) storage_with_collection["global_keys"] = list(ids_list) self.local_storage.update_storage( storage_with_collection, constant.CONTACT_DELETION_PATH ) queue.put_checkpoint(constant.CONTACTS_OBJECT.lower(), end_time, indexing_type) def create_jobs_for_tasks( self, indexing_type, sync_microsoft_outlook, thread_count, users_accounts, time_range_list, end_time, queue, ): """Create job for fetching the tasks :param indexing_type: The type of the indexing i.e. Full or Incremental :param sync_microsoft_outlook: Object of SyncMicrosoftOutlook :param thread_count: Thread count to make partitions :param users_accounts: List of users account :param time_range_list: List of time range for fetching the data :param end_time: End time for setting checkpoint :param queue: Shared queue for storing the data """ if constant.TASKS_OBJECT.lower() not in self.config.get_value("objects"): self.logger.info( "Tasks are not getting indexed because user has excluded from configuration file" ) return self.logger.debug("Started fetching the tasks") ids_list = [] storage_with_collection = self.local_storage.get_storage_with_collection( self.local_storage, constant.TASK_DELETION_PATH ) ids_list = storage_with_collection.get("global_keys") self.create_jobs( thread_count, sync_microsoft_outlook.fetch_tasks, (ids_list, users_accounts, self.microsoft_outlook_task_object), time_range_list, ) storage_with_collection["global_keys"] = list(ids_list) self.local_storage.update_storage( storage_with_collection, constant.TASK_DELETION_PATH ) queue.put_checkpoint(constant.TASKS_OBJECT.lower(), end_time, indexing_type) def get_datetime_iterable_list(self, start_time, end_time): """Get time range partition based on time duration and thread count :param start_time: Start time for fetching data :param end_time: End time for fetching data """ thread_count = self.config.get_value("source_sync_thread_count") datelist_mails = split_date_range_into_chunks( start_time, end_time, thread_count, ) time_range_list = [] for num in range(0, thread_count): time_range_list.append((datelist_mails[num], datelist_mails[num + 1])) return time_range_list