ees_microsoft_outlook/microsoft_outlook_mails.py (219 lines of code) (raw):

# # Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one # or more contributor license agreements. Licensed under the Elastic License 2.0; # you may not use this file except in compliance with the Elastic License 2.0. # """This module allows to fetch mails from Microsoft Outlook. """ import requests from iteration_utilities import unique_everseen from . import constant from .utils import ( change_datetime_format, convert_datetime_to_ews_format, extract, get_schema_fields, html_to_text, insert_document_into_doc_id_storage, retry, ) class MicrosoftOutlookMails: """This class fetches mails for all users from Microsoft Outlook""" def __init__(self, logger, config): self.logger = logger self.config = config self.time_zone = constant.DEFAULT_TIME_ZONE self.retry_count = self.config.get_value("retry_count") def get_mail_attachments( self, ids_list_mails, mail_obj, user_email_address, start_time, end_time ): """Method is used to fetch attachment from mail object store in dictionary :param ids_list_mails: Documents ids of mails :param mail_obj: Object of account :param user_email_address: Email address of user :param start_time: Start time for fetching the mails :param end_time: End time for fetching the mails Returns: mail_attachments: Dictionary of attachment """ mail_attachments = [] for attachment in mail_obj.attachments: # Logic for mail attachment last modified time attachment_created = "" if attachment.last_modified_time: attachment_created = change_datetime_format( attachment.last_modified_time, self.time_zone ) # Logic to fetch mail attachments if attachment.last_modified_time >= start_time and attachment.last_modified_time < end_time: attachments = { "type": constant.MAILS_ATTACHMENTS_OBJECT, "id": attachment.attachment_id.id, "title": attachment.name, "created": attachment_created, } attachments["_allow_permissions"] = [] if self.config.get_value("enable_document_permission"): attachments["_allow_permissions"] = [user_email_address] # Logic to insert mail attachment into global_keys object insert_document_into_doc_id_storage( ids_list_mails, attachment.attachment_id.id, mail_obj.id, constant.MAILS_ATTACHMENTS_OBJECT.lower(), self.config.get_value("connector_platform_type"), ) if hasattr(attachment, "content"): attachments["body"] = extract(attachment.content) mail_attachments.append(attachments) return mail_attachments def mails_to_docs( self, ids_list_mails, mail_type, mail_obj, user_email_address, start_time, end_time, ): """Method is used to convert mail data into Workplace Search document :param ids_list_mails: Documents ids of mails :param mail_type: Type of the mail like inbox, sent, junk :param mail_obj: Object of account :param user_email_address: Email address of user :param start_time: Start time for fetching the mails :param end_time: End time for fetching the mails Returns: mail_document: Dictionary of mail mail_attachments_documents: Dictionary of attachment """ # Logic for email sender if mail_obj.sender: sender_email = mail_obj.sender.email_address else: sender_email = "" # Logic for email recipients if mail_obj.to_recipients: receiver_email_list = [] for recipient in mail_obj.to_recipients: receiver_email_list.append(recipient.email_address) receiver_email = ", ".join(receiver_email_list) else: receiver_email = "" # Logic for email cc if mail_obj.cc_recipients: cc_list = [] for cc_recipient in mail_obj.cc_recipients: cc_list.append(cc_recipient.email_address) cc = ", ".join(cc_list) else: cc = "" # Logic for email bcc if mail_obj.bcc_recipients: bcc_list = [] for bcc_recipient in mail_obj.bcc_recipients: bcc_list.append(bcc_recipient.email_address) bcc = ", ".join(bcc_list) else: bcc = "" # Logic for mail last modified time if mail_obj.last_modified_time: mail_created = change_datetime_format( mail_obj.last_modified_time, self.time_zone ) else: mail_created = "" # Logic for mail categories if mail_obj.categories: mail_categories_list = [] for categories in mail_obj.categories: mail_categories_list.append(categories) mail_categories = ", ".join(mail_categories_list) else: mail_categories = "" # Logic to create document body mail_document = { "type": mail_type, "Id": mail_obj.id, "DisplayName": mail_obj.subject, "Description": f"""Sender Email: {sender_email} Receiver Email: {receiver_email} CC: {cc} BCC: {bcc} Importance: {mail_obj.importance} Category: {mail_categories} Body: {html_to_text(mail_obj.body)}""", "Created": mail_created, } # Logic to fetches attachments mail_attachments_documents = [] if mail_obj.has_attachments: mail_attachments_documents = self.get_mail_attachments( ids_list_mails, mail_obj, user_email_address, start_time, end_time ) return mail_document, mail_attachments_documents def get_mail_documents( self, account, ids_list_mails, mail_type, mail_objs, start_time, end_time ): """This method is used to get mail's data and mapped with fields :param account: User account object :param ids_list_mails: Documents ids list :param mail_type: Type of mail like inbox, sent, junk :param mail_obj: Object of account :param start_time: Start time for fetching the mails :param end_time: End time for fetching the mails Returns: documents: List of documents """ documents = [] mail_schema = get_schema_fields( constant.MAILS_OBJECT.lower(), self.config.get_value("objects") ) for mail_obj in mail_objs: # Logic to insert mail into global_keys object insert_document_into_doc_id_storage( ids_list_mails, mail_obj.id, "", mail_type.lower(), self.config.get_value("connector_platform_type"), ) ( mail_dict, mail_attachment, ) = self.mails_to_docs( ids_list_mails, mail_type, mail_obj, account.primary_smtp_address, start_time, end_time, ) mail_map = {} mail_map["_allow_permissions"] = [] if self.config.get_value("enable_document_permission"): mail_map["_allow_permissions"] = [account.primary_smtp_address] mail_map["type"] = mail_dict["type"] for ws_field, ms_fields in mail_schema.items(): mail_map[ws_field] = mail_dict[ms_fields] documents.append(mail_map) if mail_attachment: documents.extend(mail_attachment) return documents @retry(exception_list=(requests.exceptions.RequestException,)) def get_mails(self, ids_list_mails, accounts, start_time, end_time): """This method is used to get documents of mails and mapped with Workplace Search fields :param ids_list_mails: List of ids of documents :param accounts: List of user accounts :param start_time: Start time for fetching the mails :param end_time: End time for fetching the mails Returns: documents: List of all types of mail documents """ documents = [] mail_type = [ { "folder": "inbox", "constant": constant.INBOX_MAIL_OBJECT, }, { "folder": "sent", "constant": constant.SENT_MAIL_OBJECT, }, { "folder": "junk", "constant": constant.JUNK_MAIL_OBJECT, }, { "folder": "archive", "constant": constant.ARCHIVE_MAIL_OBJECT, }, ] start_time = convert_datetime_to_ews_format(start_time) end_time = convert_datetime_to_ews_format(end_time) for account in accounts: # Logic to set time zone according to user account self.time_zone = account.default_timezone try: for type in mail_type: # Logic to get mails folder if "archive" in type["folder"]: mail_type_obj_folder = ( account.root / "Top of Information Store" / "Archive" ) else: mail_type_obj_folder = getattr(account, type["folder"]) # Logic to fetch mails mail_type_obj = ( mail_type_obj_folder.all() .filter( last_modified_time__gt=start_time, last_modified_time__lt=end_time, ) .only( "sender", "to_recipients", "cc_recipients", "bcc_recipients", "last_modified_time", "subject", "importance", "categories", "body", "has_attachments", "attachments", ) ) mail_type_documents = self.get_mail_documents( account, ids_list_mails, type["constant"], mail_type_obj, start_time, end_time, ) documents.extend(mail_type_documents) except requests.exceptions.RequestException as request_error: raise requests.exceptions.RequestException( f"Error while fetching mails data for {account.primary_smtp_address}. Error: {request_error}" ) except Exception as exception: self.logger.info( f"Error while fetching mails data for {account.primary_smtp_address}. Error: {exception}" ) pass return list(unique_everseen(documents))