ees_network_drive/sync_enterprise_search.py (47 lines of code) (raw):

# # Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one # or more contributor license agreements. Licensed under the Elastic License 2.0; # you may not use this file except in compliance with the Elastic License 2.0. # """This module allows to sync data to Elastic Enterprise Search. It's possible to run full syncs and incremental syncs with this module. """ import threading from .constant import BATCH_SIZE, CONNECTION_TIMEOUT from .utils import split_documents_into_equal_chunks class SyncEnterpriseSearch: """This class contains common logic for indexing to workplace search""" def __init__(self, config, logger, workplace_search_custom_client, queue): self.logger = logger self.workplace_search_custom_client = workplace_search_custom_client self.queue = queue self.ws_source = config.get_value("enterprise_search.source_id") self.enterprise_search_sync_thread_count = config.get_value("enterprise_search_sync_thread_count") self.total_document_indexed = 0 self.total_documents_found = 0 def index_documents(self, documents): """This method indexes the documents to the Enterprise Search. :param documents: list of documents to be indexed """ self.total_documents_found += len(documents) if documents: documents_indexed = 0 responses = self.workplace_search_custom_client.index_documents( documents, CONNECTION_TIMEOUT, ) for document in responses["results"]: if not document["errors"]: documents_indexed += 1 else: self.logger.error( f"Unable to index the document with id: {document['id']} Error {document['errors']}" ) self.total_document_indexed += documents_indexed def perform_sync(self): """Pull documents from the queue and synchronize it to the Enterprise Search.""" try: signal_open = True while signal_open: documents_to_index = [] while len(documents_to_index) < BATCH_SIZE: document = self.queue.get() if document.get("type") == "signal_close": self.logger.info(f"Found an end signal in the queue. Closing Thread ID {threading.get_ident()}") signal_open = False break else: documents_to_index.extend(document.get("data")) # This loop is to ensure if the last document fetched from the queue exceeds the size of # documents_to_index to more than the permitted chunk size, then we split the documents as per the limit for document_list in split_documents_into_equal_chunks(documents_to_index, BATCH_SIZE): self.index_documents(document_list) except Exception as exception: self.logger.error(exception) self.logger.info(f"Thread ID: {threading.get_ident()} Total {self.total_document_indexed} documents \ indexed out of: {self.total_documents_found} till now..")