in ees_microsoft_outlook/sync_enterprise_search.py [0:0]
def perform_sync(self):
"""Pull documents from the queue and synchronize it to the Enterprise Search."""
try:
signal_open = True
while signal_open:
documents_to_index, deleted_document = [], []
while len(documents_to_index) < constant.BATCH_SIZE and len(str(documents_to_index)) < self.max_allowed_bytes:
queue_item = self.queue.get()
if queue_item.get("type") == constant.SIGNAL_CLOSE:
signal_open = False
break
elif queue_item.get("type") == constant.CHECKPOINT:
data = queue_item.get("data")
checkpoint_dict = {
"current_time": data[1],
"index_type": data[2],
"object_type": data[0],
}
self.checkpoint_list.append(checkpoint_dict)
break
elif queue_item.get("type") == "deletion":
deleted_document.extend(queue_item.get("data"))
else:
documents_to_index.extend(queue_item.get("data"))
# This loop is to ensure if the last document fetched from the queue exceeds the size of
# documents_to_index to more than the permitted chunk size, then we split the documents as per the limit
if documents_to_index:
for chunk in split_documents_into_equal_chunks(
documents_to_index, constant.BATCH_SIZE
):
for documents in split_documents_into_equal_bytes(
chunk, self.max_allowed_bytes
):
self.index_documents(documents)
if deleted_document:
for chunk in split_documents_into_equal_chunks(
deleted_document, constant.BATCH_SIZE
):
self.delete_documents(chunk)
if not signal_open:
break
except Exception as exception:
self.logger.info(f"Error while indexing the objects. Error: {exception}")