ees_microsoft_outlook/sync_enterprise_search.py (101 lines of code) (raw):
#
# Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
# or more contributor license agreements. Licensed under the Elastic License 2.0;
# you may not use this file except in compliance with the Elastic License 2.0.
#
import collections
import copy
from . import constant
from .utils import split_documents_into_equal_bytes, split_documents_into_equal_chunks
class SyncEnterpriseSearch:
"""This class allows ingesting documents to Elastic Enterprise Search."""
def __init__(self, config, logger, workplace_search_custom_client, queue):
self.config = config
self.logger = logger
self.workplace_search_custom_client = workplace_search_custom_client
self.ws_source = config.get_value("enterprise_search.source_id")
self.ws_auth = config.get_value("enterprise_search.api_key")
self.enterprise_search_thread_count = config.get_value(
"enterprise_search_sync_thread_count"
)
self.queue = queue
self.checkpoint_list = []
self.max_allowed_bytes = 10000000
def index_documents(self, documents):
"""This method indexes the documents to the Enterprise Search.
:param documents: Documents to be indexed
"""
try:
if documents:
error_count = 0
documents_dict = collections.defaultdict(dict)
for document in documents:
documents_dict[document["id"]] = document
total_records_dict = self.get_records_by_types(documents)
total_inserted_record_dict = copy.deepcopy(total_records_dict)
responses = self.workplace_search_custom_client.index_documents(
documents,
constant.CONNECTION_TIMEOUT,
)
if responses:
for each in responses["results"]:
if each["errors"]:
# Removing the failed document from the successfully indexed document count
error_count += 1
type = documents_dict[each["id"]]["type"]
total_inserted_record_dict[type].remove(each["id"])
for type, count in total_records_dict.items():
self.logger.info(
f"Total {type} indexed: {len(total_inserted_record_dict[type])} out of {len(count)}."
if total_inserted_record_dict
else "There is no record found to index into Workplace Search"
)
if error_count:
self.logger.info(
f"Total {error_count} documents missed due to some error and it will sync in next full-sync cycle"
)
except Exception as exception:
self.logger.info(
f"Error while indexing {len(documents)} documents into Workplace Search. Error: {exception}"
)
def get_records_by_types(self, documents):
"""This method is used to for grouping the document based on their type
:param documents: Document to be indexed
Returns:
dict_count: Dictionary of type with its count
"""
if not documents:
return {}
dict_count = collections.defaultdict(list)
for item in documents:
dict_count[item["type"]].append(item["id"])
return dict_count
def delete_documents(self, final_deleted_list):
"""Deletes the documents of specified ids from Workplace Search
:param final_deleted_list: List of ids to delete the documents from Workplace Search
"""
for index in range(0, len(final_deleted_list), constant.BATCH_SIZE):
final_list = final_deleted_list[index: index + constant.BATCH_SIZE]
# Logic to delete documents from the Workplace Search
self.workplace_search_custom_client.delete_documents(final_list)
def perform_sync(self):
"""Pull documents from the queue and synchronize it to the Enterprise Search."""
try:
signal_open = True
while signal_open:
documents_to_index, deleted_document = [], []
while len(documents_to_index) < constant.BATCH_SIZE and len(str(documents_to_index)) < self.max_allowed_bytes:
queue_item = self.queue.get()
if queue_item.get("type") == constant.SIGNAL_CLOSE:
signal_open = False
break
elif queue_item.get("type") == constant.CHECKPOINT:
data = queue_item.get("data")
checkpoint_dict = {
"current_time": data[1],
"index_type": data[2],
"object_type": data[0],
}
self.checkpoint_list.append(checkpoint_dict)
break
elif queue_item.get("type") == "deletion":
deleted_document.extend(queue_item.get("data"))
else:
documents_to_index.extend(queue_item.get("data"))
# This loop is to ensure if the last document fetched from the queue exceeds the size of
# documents_to_index to more than the permitted chunk size, then we split the documents as per the limit
if documents_to_index:
for chunk in split_documents_into_equal_chunks(
documents_to_index, constant.BATCH_SIZE
):
for documents in split_documents_into_equal_bytes(
chunk, self.max_allowed_bytes
):
self.index_documents(documents)
if deleted_document:
for chunk in split_documents_into_equal_chunks(
deleted_document, constant.BATCH_SIZE
):
self.delete_documents(chunk)
if not signal_open:
break
except Exception as exception:
self.logger.info(f"Error while indexing the objects. Error: {exception}")