ees_sharepoint/full_sync_command.py (51 lines of code) (raw):

# # Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one # or more contributor license agreements. Licensed under the Elastic License 2.0; # you may not use this file except in compliance with the Elastic License 2.0. # """This module allows to run a full sync against a Sharepoint Server instance. It will attempt to sync absolutely all documents that are available in the third-party system and ingest them into Enterprise Search instance.""" from datetime import datetime from .base_command import BaseCommand from .connector_queue import ConnectorQueue from .sync_enterprise_search import SyncEnterpriseSearch from .sync_sharepoint import SyncSharepoint from .utils import split_date_range_into_chunks class FullSyncCommand(BaseCommand): """This class start execution of fullsync feature.""" def start_producer(self, queue): """This method starts async calls for the producer which is responsible for fetching documents from the SharePoint and pushing them in the shared queue :param queue: Shared queue to fetch the stored documents """ self.logger.debug("Starting the full indexing..") current_time = (datetime.utcnow()).strftime("%Y-%m-%dT%H:%M:%SZ") thread_count = self.config.get_value("sharepoint_sync_thread_count") start_time, end_time = self.config.get_value("start_time"), current_time try: sync_sharepoint = SyncSharepoint( self.config, self.logger, self.workplace_search_custom_client, self.sharepoint_client, start_time, end_time, queue, ) datelist = split_date_range_into_chunks( start_time, end_time, thread_count, ) for collection in self.config.get_value("sharepoint.site_collections"): storage_with_collection = self.local_storage.get_storage_with_collection(collection) self.logger.info( "Starting to index all the objects configured in the object field: %s" % (str(self.config.get_value("objects"))) ) ids = storage_with_collection["global_keys"][collection] storage_with_collection["global_keys"][collection] = sync_sharepoint.fetch_records_from_sharepoint(self.producer, datelist, thread_count, ids, collection) queue.put_checkpoint(collection, end_time, "full") enterprise_thread_count = self.config.get_value("enterprise_search_sync_thread_count") for _ in range(enterprise_thread_count): queue.end_signal() except Exception as exception: self.logger.exception(f"Error while fetching the objects . Error {exception}") raise exception self.local_storage.update_storage(storage_with_collection) def start_consumer(self, queue): """This method starts async calls for the consumer which is responsible for indexing documents to the Enterprise Search :param queue: Shared queue to fetch the stored documents """ thread_count = self.config.get_value("enterprise_search_sync_thread_count") sync_es = SyncEnterpriseSearch(self.config, self.logger, self.workplace_search_custom_client, queue) self.consumer(thread_count, sync_es.perform_sync) def execute(self): """This function execute the start function.""" queue = ConnectorQueue(self.logger) self.start_producer(queue) self.start_consumer(queue)