ees_sharepoint/sync_enterprise_search.py (60 lines of code) (raw):

# # Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one # or more contributor license agreements. Licensed under the Elastic License 2.0; # you may not use this file except in compliance with the Elastic License 2.0. # import threading from .checkpointing import Checkpoint from .utils import split_documents_into_equal_chunks BATCH_SIZE = 100 CONNECTION_TIMEOUT = 1000 class SyncEnterpriseSearch: """This class allows ingesting documents to Elastic Enterprise Search.""" def __init__(self, config, logger, workplace_search_custom_client, queue): self.config = config self.logger = logger self.workplace_search_custom_client = workplace_search_custom_client self.queue = queue def index_documents(self, documents): """This method indexes the documents to the Enterprise Search. :param documents: documents to be indexed """ total_documents_indexed = 0 if documents: responses = self.workplace_search_custom_client.index_documents( documents=documents, timeout=CONNECTION_TIMEOUT, ) for response in responses["results"]: if not response["errors"]: total_documents_indexed += 1 else: self.logger.error( "Error while indexing %s. Error: %s" % (response["id"], response["errors"]) ) self.logger.info( f"[{threading.get_ident()}] Successfully indexed {total_documents_indexed} documents to the workplace" ) def perform_sync(self): """Pull documents from the queue and synchronize it to the Enterprise Search.""" try: checkpoint = Checkpoint(self.config, self.logger) signal_open = True while signal_open: documents_to_index = [] while len(documents_to_index) < BATCH_SIZE: documents = self.queue.get() if documents.get("type") == "signal_close": self.logger.info( f"Found an end signal in the queue. Closing Thread ID {threading.get_ident()}" ) signal_open = False break elif documents.get("type") == "checkpoint": checkpoint.set_checkpoint( documents.get("data")[0], documents.get("data")[1], documents.get("data")[2], ) break else: documents_to_index.extend(documents.get("data")) # This loop is to ensure if the last document fetched from the queue exceeds the size of # documents_to_index to more than the permitted chunk size, then we split the documents as per the limit for chunk in split_documents_into_equal_chunks( documents_to_index, BATCH_SIZE ): self.index_documents(chunk) except Exception as exception: self.logger.error( f"Error while indexing the documents to the Enterprise Search. Error {exception}" )