in ees_sharepoint/sync_enterprise_search.py [0:0]
def perform_sync(self):
"""Pull documents from the queue and synchronize it to the Enterprise Search."""
try:
checkpoint = Checkpoint(self.config, self.logger)
signal_open = True
while signal_open:
documents_to_index = []
while len(documents_to_index) < BATCH_SIZE:
documents = self.queue.get()
if documents.get("type") == "signal_close":
self.logger.info(
f"Found an end signal in the queue. Closing Thread ID {threading.get_ident()}"
)
signal_open = False
break
elif documents.get("type") == "checkpoint":
checkpoint.set_checkpoint(
documents.get("data")[0],
documents.get("data")[1],
documents.get("data")[2],
)
break
else:
documents_to_index.extend(documents.get("data"))
# This loop is to ensure if the last document fetched from the queue exceeds the size of
# documents_to_index to more than the permitted chunk size, then we split the documents as per the limit
for chunk in split_documents_into_equal_chunks(
documents_to_index, BATCH_SIZE
):
self.index_documents(chunk)
except Exception as exception:
self.logger.error(
f"Error while indexing the documents to the Enterprise Search. Error {exception}"
)