ees_microsoft_outlook/base_indexing_command.py (53 lines of code) (raw):
#
# Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
# or more contributor license agreements. Licensed under the Elastic License 2.0;
# you may not use this file except in compliance with the Elastic License 2.0.
#
"""Module contains a base indexing command interface.
Connector can run multiple commands such as full-sync and incremental-sync.
This module provides convenience interface defining the shared
objects and methods that will can be used by commands."""
from .base_command import BaseCommand
from .checkpointing import Checkpoint
from .constant import (CONNECTOR_TYPE_MICROSOFT_EXCHANGE,
CONNECTOR_TYPE_OFFICE365)
from .microsoft_exchange_server_user import MicrosoftExchangeServerUser
from .office365_user import Office365User
from .sync_enterprise_search import SyncEnterpriseSearch
class BaseIndexingCommand(BaseCommand):
"""This class contain common methods for full-sync and incremental-sync commands."""
def get_accounts(self):
"""This method gets Outlook account of active directory users
Returns:
users_accounts: List of all user accounts
"""
platform_type = self.config.get_value("connector_platform_type")
self.logger.debug(
f"Starting producer for fetching objects from {platform_type}"
)
self.logger.info(f"Fetching users account from the {platform_type}")
# Logic to fetch users from Microsoft Exchange or Office365
if CONNECTOR_TYPE_OFFICE365 in platform_type:
office365_connection = Office365User(self.config)
users = office365_connection.get_users()
users_accounts = office365_connection.get_users_accounts(users)
elif CONNECTOR_TYPE_MICROSOFT_EXCHANGE in platform_type:
microsoft_exchange_server_connection = MicrosoftExchangeServerUser(
self.config
)
users = microsoft_exchange_server_connection.get_users()
users_accounts = microsoft_exchange_server_connection.get_users_accounts(
users
)
if len(users_accounts) >= 0:
self.logger.info(
f"Successfully fetched users accounts from the {platform_type}"
)
else:
self.logger.info("Error while fetching users from the Active Directory")
exit()
return users_accounts
def pass_end_signal(self, queue):
"""This method pass end signal into queue
:param queue: Shared queue to pass end signal
"""
enterprise_thread_count = self.config.get_value(
"enterprise_search_sync_thread_count"
)
for _ in range(enterprise_thread_count):
queue.end_signal()
def start_consumer(self, queue):
"""This method starts async calls for the consumer which is responsible for indexing documents to the
Enterprise Search
:param queue: Shared queue to fetch the stored documents
"""
checkpoint = Checkpoint(self.logger, self.config)
thread_count = self.config.get_value("enterprise_search_sync_thread_count")
sync_es = SyncEnterpriseSearch(
self.config, self.logger, self.workplace_search_custom_client, queue
)
self.create_jobs(thread_count, sync_es.perform_sync, (), [])
for checkpoint_data in sync_es.checkpoint_list:
checkpoint.set_checkpoint(
checkpoint_data["current_time"],
checkpoint_data["index_type"],
checkpoint_data["object_type"],
)