ees_microsoft_outlook/deletion_sync_command.py (244 lines of code) (raw):
#
# Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
# or more contributor license agreements. Licensed under the Elastic License 2.0;
# you may not use this file except in compliance with the Elastic License 2.0.
#
"""This module allows to remove recently deleted documents from Elastic Enterprise Search.
Documents that were deleted in source will still be available in
Elastic Enterprise Search until a full sync happens, or until this module is used.
"""
from . import constant
from .base_command import BaseCommand
from .connector_queue import ConnectorQueue
from .microsoft_exchange_server_user import MicrosoftExchangeServerUser
from .office365_user import Office365User
from .sync_enterprise_search import SyncEnterpriseSearch
class DeletionSyncCommand(BaseCommand):
"""This class start executions of deletion feature."""
def remove_deleted_documents_from_global_keys(
self,
live_documents,
list_ids_documents,
deleted_documents,
global_keys_documents,
):
"""Updates the local storage with removing the keys that were deleted from Microsoft Outlook
:param live_documents: Documents present in Microsoft Outlook
:param list_ids_documents: Documents present in respective doc_ids.json files
:param deleted_documents: Document list that were deleted from Microsoft Outlook
:param global_keys_documents: Document list that are present in doc_ids.json
:param parent_id: Parent id of the document
"""
for item in list_ids_documents:
item_id = item["id"]
platform = item["platform"]
items_exists = list(
filter(
lambda seq: seq["id"] == item_id,
live_documents,
)
)
if len(items_exists) == 0 and self.config.get_value("connector_platform_type") in platform:
deleted_documents.append(item_id)
if item in global_keys_documents:
global_keys_documents.remove(item)
def create_jobs_for_mails_deletion(
self,
thread_count,
users_accounts,
time_range_list,
queue,
):
"""Creates jobs for deleting the mails
:param thread_count: Thread count to make partitions
:param users_accounts: User accounts
:param time_range_list: Time range split
:param queue: Shared queue for storing the data
"""
if constant.MAILS_OBJECT.lower() not in self.config.get_value("objects"):
return
self.logger.debug("Started deletion of mails...")
storage_with_collection = self.local_storage.get_storage_with_collection(
self.local_storage, constant.MAIL_DELETION_PATH
)
mails_documents = self.create_jobs(
thread_count,
self.microsoft_outlook_mail_object.get_mails,
(
[],
users_accounts,
),
time_range_list,
)
delete_keys_documents = storage_with_collection.get("delete_keys") or []
global_keys_documents = storage_with_collection.get("global_keys") or []
deleted_documents = []
self.remove_deleted_documents_from_global_keys(
mails_documents,
delete_keys_documents,
deleted_documents,
global_keys_documents,
)
queue.append_to_queue("deletion", list(deleted_documents))
storage_with_collection["global_keys"] = list(global_keys_documents)
storage_with_collection["delete_keys"] = []
self.local_storage.update_storage(
storage_with_collection, constant.MAIL_DELETION_PATH
)
self.logger.info("Completed deletion of mails")
def create_jobs_for_calendar_deletion(
self,
thread_count,
users_accounts,
time_range_list,
queue,
):
"""Creates jobs for deleting the calendar
:param thread_count: Thread count to make partitions
:param users_accounts: User accounts
:param time_range_list: Time range split
:param queue: Shared queue for storing the data
"""
if constant.CALENDARS_OBJECT.lower() not in self.config.get_value("objects"):
return
self.logger.debug("Started deletion of calendar...")
storage_with_collection = self.local_storage.get_storage_with_collection(
self.local_storage, constant.CALENDAR_DELETION_PATH
)
calendar_documents = self.create_jobs(
thread_count,
self.microsoft_outlook_calendar_object.get_calendar,
([], users_accounts,),
time_range_list,
)
delete_keys_documents = storage_with_collection.get("delete_keys") or []
global_keys_documents = storage_with_collection.get("global_keys") or []
deleted_documents = []
self.remove_deleted_documents_from_global_keys(
calendar_documents,
delete_keys_documents,
deleted_documents,
global_keys_documents,
)
queue.append_to_queue("deletion", list(deleted_documents))
storage_with_collection["global_keys"] = list(global_keys_documents)
storage_with_collection["delete_keys"] = []
self.local_storage.update_storage(
storage_with_collection, constant.CALENDAR_DELETION_PATH
)
self.logger.info("Completed deletion of calendar")
def create_jobs_for_contacts_deletion(
self,
thread_count,
users_accounts,
time_range_list,
queue,
):
"""Creates jobs for deleting the contacts
:param thread_count: Thread count to make partitions
:param users_accounts: User accounts
:param time_range_list: Time range split
:param queue: Shared queue for storing the data
"""
if constant.CONTACTS_OBJECT.lower() not in self.config.get_value("objects"):
return
self.logger.debug("Started deletion of contacts...")
storage_with_collection = self.local_storage.get_storage_with_collection(
self.local_storage, constant.CONTACT_DELETION_PATH
)
contacts_documents = self.create_jobs(
thread_count,
self.microsoft_outlook_contact_object.get_contacts,
(
[],
users_accounts,
),
time_range_list,
)
delete_keys_documents = storage_with_collection.get("delete_keys") or []
global_keys_documents = storage_with_collection.get("global_keys") or []
deleted_documents = []
self.remove_deleted_documents_from_global_keys(
contacts_documents,
delete_keys_documents,
deleted_documents,
global_keys_documents,
)
queue.append_to_queue("deletion", list(deleted_documents))
storage_with_collection["global_keys"] = list(global_keys_documents)
storage_with_collection["delete_keys"] = []
self.local_storage.update_storage(
storage_with_collection, constant.CONTACT_DELETION_PATH
)
self.logger.info("Completed deletion of contacts")
def create_jobs_for_tasks_deletion(
self,
thread_count,
users_accounts,
time_range_list,
queue,
):
"""Creates jobs for deleting the tasks
:param thread_count: Thread count to make partitions
:param users_accounts: User accounts
:param time_range_list: Time range split
:param queue: Shared queue for storing the data
"""
if constant.TASKS_OBJECT.lower() not in self.config.get_value("objects"):
return
self.logger.debug("Started deletion of tasks...")
storage_with_collection = self.local_storage.get_storage_with_collection(
self.local_storage, constant.TASK_DELETION_PATH
)
tasks_documents = self.create_jobs(
thread_count,
self.microsoft_outlook_task_object.get_tasks,
([], users_accounts,),
time_range_list,
)
delete_keys_documents = storage_with_collection.get("delete_keys") or []
global_keys_documents = storage_with_collection.get("global_keys") or []
deleted_documents = []
self.remove_deleted_documents_from_global_keys(
tasks_documents,
delete_keys_documents,
deleted_documents,
global_keys_documents,
)
queue.append_to_queue("deletion", list(deleted_documents))
storage_with_collection["global_keys"] = list(global_keys_documents)
storage_with_collection["delete_keys"] = []
self.local_storage.update_storage(
storage_with_collection, constant.TASK_DELETION_PATH
)
self.logger.info("Completed deletion of tasks")
def start_producer(self, queue):
"""This method starts async calls for the producer which is responsible
for fetching documents from the Microsoft Outlook and pushing them in the shared queue
:param queue: Shared queue to store the fetched documents
"""
thread_count = self.config.get_value("microsoft_outlook_sync_thread_count")
product_type = self.config.get_value("connector_platform_type")
self.logger.debug(f"Starting producer for fetching objects from {product_type}")
# Logic to fetch users from Microsoft Exchange or Office365
if constant.CONNECTOR_TYPE_OFFICE365 in self.config.get_value(
"connector_platform_type"
):
office365_connection = Office365User(self.config)
users = office365_connection.get_users()
users_accounts = office365_connection.get_users_accounts(users)
elif constant.CONNECTOR_TYPE_MICROSOFT_EXCHANGE in self.config.get_value(
"connector_platform_type"
):
microsoft_exchange_server_connection = MicrosoftExchangeServerUser(
self.config
)
users = microsoft_exchange_server_connection.get_users()
users_accounts = microsoft_exchange_server_connection.get_users_accounts(
users
)
if len(users_accounts) >= 0:
self.logger.info(
f"Successfully fetched users accounts from the {product_type}"
)
else:
self.logger.info("Error while fetching users from the Active Directory")
exit()
start_time, end_time = (
self.config.get_value("start_time"),
constant.CURRENT_TIME,
)
# Logic to fetch mails, calendars, contacts and task from Microsoft Outlook by using multithreading approach
time_range_list = self.get_datetime_iterable_list(start_time, end_time)
self.create_jobs_for_mails_deletion(
thread_count,
users_accounts,
time_range_list,
queue,
)
self.create_jobs_for_calendar_deletion(
thread_count,
users_accounts,
time_range_list,
queue,
)
self.create_jobs_for_contacts_deletion(
thread_count,
users_accounts,
time_range_list,
queue,
)
self.create_jobs_for_tasks_deletion(
thread_count,
users_accounts,
time_range_list,
queue,
)
for _ in range(self.config.get_value("enterprise_search_sync_thread_count")):
queue.end_signal()
def start_consumer(self, queue):
"""This method starts async calls for the consumer which is responsible for indexing documents to the
Enterprise Search
:param queue: Shared queue to fetch the stored documents
"""
self.logger.debug("Starting consumer for deleting objects to Workplace Search")
thread_count = self.config.get_value("enterprise_search_sync_thread_count")
sync_es = SyncEnterpriseSearch(
self.config, self.logger, self.workplace_search_custom_client, queue
)
self.create_jobs(thread_count, sync_es.perform_sync, (), [])
self.logger.info("Completed deletion of the Microsoft Outlook objects")
def execute(self):
"""This function execute the start function."""
queue = ConnectorQueue(self.logger)
self.start_producer(queue)
self.start_consumer(queue)
self.logger.info("Completed Deletion sync")