ees_microsoft_outlook/microsoft_outlook_tasks.py (173 lines of code) (raw):

# # Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one # or more contributor license agreements. Licensed under the Elastic License 2.0; # you may not use this file except in compliance with the Elastic License 2.0. # """This module allows to fetch tasks from Microsoft Outlook. """ import requests from iteration_utilities import unique_everseen from . import constant from .utils import ( change_datetime_format, convert_datetime_to_ews_format, extract, get_schema_fields, insert_document_into_doc_id_storage, retry, ) class MicrosoftOutlookTasks: """This class fetches tasks for all users from Microsoft Outlook""" def __init__(self, logger, config): self.logger = logger self.config = config self.time_zone = constant.DEFAULT_TIME_ZONE self.retry_count = self.config.get_value("retry_count") def get_task_attachments( self, ids_list_tasks, task_obj, user_email_address, start_time, end_time ): """Method is used to fetch attachment from task object store in dictionary :param ids_list_tasks: Documents ids of tasks :param mail_obj: Object of account :param user_email_address: Email address of user :param start_time: Start time for fetching the tasks :param end_time: End time for fetching the tasks Returns: task_attachments: Dictionary of attachment """ task_attachments = [] for attachment in task_obj.attachments: # Logic for task last modified time attachment_created = "" if attachment.last_modified_time: attachment_created = change_datetime_format( attachment.last_modified_time, self.time_zone ) # Logic to fetch task attachments if attachment.last_modified_time >= start_time and attachment.last_modified_time < end_time: attachments = { "type": constant.TASKS_ATTACHMENTS_OBJECT, "id": attachment.attachment_id.id, "title": attachment.name, "created": attachment_created, } attachments["_allow_permissions"] = [] if self.config.get_value("enable_document_permission"): attachments["_allow_permissions"] = [user_email_address] # Logic to insert task attachment into global_keys object insert_document_into_doc_id_storage( ids_list_tasks, attachment.attachment_id.id, task_obj.id, constant.TASKS_ATTACHMENTS_OBJECT.lower(), self.config.get_value("connector_platform_type"), ) if hasattr(attachment, "content"): attachments["body"] = extract(attachment.content) task_attachments.append(attachments) return task_attachments def tasks_to_docs( self, task_obj, ids_list_tasks, user_email_address, start_time, end_time ): """Method is used to convert task data into Workplace Search document :param task_obj: Object of task :param ids_list_tasks: List of ids of documents :param user_email_address: Email address of user :param start_time: Start time for fetching the tasks :param end_time: End time for fetching the tasks Returns: task_document: Dictionary of task task_attachments_documents: Dictionary of attachment """ # Logic for task last modified time if task_obj.last_modified_time: task_created = change_datetime_format( task_obj.last_modified_time, self.time_zone ) else: task_created = "" # Logic for task start date if task_obj.start_date: task_start = change_datetime_format(task_obj.start_date, self.time_zone) else: task_start = "" # Logic for task due date if task_obj.due_date: task_due = change_datetime_format(task_obj.due_date, self.time_zone) else: task_due = "" # Logic for task complete date if task_obj.complete_date: task_complete = ( change_datetime_format(task_obj.complete_date, self.time_zone) ).split("T", 1)[0] else: task_complete = "" # Logic for task categories if task_obj.categories: task_categories_list = [] for categories in task_obj.categories: task_categories_list.append(categories) task_categories = ", ".join(task_categories_list) else: task_categories = "" # Logic for task companies if task_obj.companies: task_companies = task_obj.companies[0] else: task_companies = "" # Logic to create document body task_document = { "type": constant.TASKS_OBJECT, "Id": task_obj.id, "DisplayName": task_obj.subject, "Created": task_created, } # Logic to bifurcate connector platform document if constant.CONNECTOR_TYPE_MICROSOFT_EXCHANGE in self.config.get_value( "connector_platform_type" ): task_document[ "Description" ] = f""" Due Date: {task_due} Status: {task_obj.status} Owner: {task_obj.owner} Start Date: {task_start} Complete Date: {task_complete} Body: {task_obj.text_body} Companies: {task_companies} Categories: {task_categories} Importance: {task_obj.importance}""" elif constant.CONNECTOR_TYPE_OFFICE365 in self.config.get_value( "connector_platform_type" ): task_document[ "Description" ] = f""" Due Date: {task_due} Status: {task_obj.status} Owner: {task_obj.owner} Complete Date: {task_complete} Body: {task_obj.text_body} Categories: {task_categories} Importance: {task_obj.importance}""" # Logic to fetches attachments task_attachments_documents = [] if task_obj.has_attachments: task_attachments_documents = self.get_task_attachments( ids_list_tasks, task_obj, user_email_address, start_time, end_time ) return task_document, task_attachments_documents @retry(exception_list=(requests.exceptions.RequestException,)) def get_tasks(self, ids_list_tasks, accounts, start_time, end_time): """This method is used to fetch tasks from Microsoft Outlook :param ids_list_tasks: List of ids of documents :param accounts: List of user accounts :param start_time: Start time for fetching the tasks :param end_time: End time for fetching the tasks Returns: documents: List of documents """ documents = [] start_time = convert_datetime_to_ews_format(start_time) end_time = convert_datetime_to_ews_format(end_time) task_schema = get_schema_fields( constant.TASKS_OBJECT.lower(), self.config.get_value("objects") ) for account in accounts: # Logic to set time zone according to user account self.time_zone = account.default_timezone try: # Logic to fetch tasks for task in ( account.tasks.all() .filter( last_modified_time__gt=start_time, last_modified_time__lt=end_time, ) .only( "last_modified_time", "due_date", "complete_date", "subject", "status", "owner", "start_date", "text_body", "companies", "categories", "importance", "has_attachments", "attachments", ) ): # Logic to insert task into global_keys object insert_document_into_doc_id_storage( ids_list_tasks, task.id, "", constant.TASKS_OBJECT.lower(), self.config.get_value("connector_platform_type"), ) (task_obj, task_attachment,) = self.tasks_to_docs( task, ids_list_tasks, account.primary_smtp_address, start_time, end_time, ) task_map = {} task_map["_allow_permissions"] = [] if self.config.get_value("enable_document_permission"): task_map["_allow_permissions"] = [account.primary_smtp_address] task_map["type"] = constant.TASKS_OBJECT for ws_field, ms_field in task_schema.items(): task_map[ws_field] = task_obj[ms_field] documents.append(task_map) if task_attachment: documents.extend(task_attachment) except requests.exceptions.RequestException as request_error: raise requests.exceptions.RequestException( f"Error while fetching tasks data for {account.primary_smtp_address}. Error: {request_error}" ) except Exception as exception: self.logger.info( f"Error while fetching tasks data for {account.primary_smtp_address}. Error: {exception}" ) pass return list(unique_everseen(documents))