ees_microsoft_teams/sync_enterprise_search.py (130 lines of code) (raw):

# # Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one # or more contributor license agreements. Licensed under the Elastic License 2.0; # you may not use this file except in compliance with the Elastic License 2.0. # import pandas as pd from iteration_utilities import unique_everseen from . import constant from .utils import (split_documents_into_equal_chunks, split_list_into_buckets, split_documents_into_equal_bytes, is_document_in_present_data) PERMISSION_LIMIT = 1024 class IndexingError(Exception): """Exception raised when indexing gets failed. Attributes: errors - errors found while indexing the documents to Workplace Search """ def __init__(self, errors): super().__init__(f"Error while indexing the documents to Workplace Search. Errors: {errors}.") self.errors = errors class SyncEnterpriseSearch: """This class allows ingesting documents to Elastic Enterprise Search.""" def __init__(self, config, logger, workplace_search_custom_client, queue): self.logger = logger self.workplace_search_custom_client = workplace_search_custom_client self.ws_source = config.get_value("enterprise_search.source_id") self.enterprise_search_thread_count = config.get_value( "enterprise_search_sync_thread_count" ) self.queue = queue self.checkpoint_list = [] self.permission_list_to_index = [] self.max_allowed_bytes = 10000000 def get_records_by_types(self, documents): """This method is used to for grouping the document based on their type :param documents: Document to be indexed Returns: df_dict: Dictionary of type with its count """ if not documents: return {} data_frame = pd.DataFrame(documents) data_frame_size = data_frame.groupby("type").size() data_frame_dict = data_frame_size.to_dict() return data_frame_dict def fetch_documents_by_id(self, response, documents): """Filters the documents which are getting failed while indexing :param response: Response getting from the Workplace Search :param documents: Documents to be indexed into the Workplace Search """ return list(filter(lambda seq: is_document_in_present_data(seq, response["id"], "id"), documents,)) def index_documents(self, documents): """This method indexes the documents to the workplace. :param documents: Documents to be indexed into the Workplace Search """ if documents: total_records_dict = self.get_records_by_types(documents) for chunk in split_list_into_buckets(documents, constant.BATCH_SIZE): try: response = self.workplace_search_custom_client.index_documents( chunk, constant.CONNECTION_TIMEOUT ) except IndexingError as exception: raise IndexingError(exception) for result in response["results"]: if result["errors"]: failed_document_list = self.fetch_documents_by_id(result, documents) # Removing the failed document from the successfully indexed document count documents = [document for document in documents if document not in failed_document_list] self.logger.error( f"Error while indexing {result['id']}. Error: {result['errors']}" ) total_inserted_record_dict = self.get_records_by_types(documents) for type, count in total_records_dict.items(): self.logger.info(f"Total {total_inserted_record_dict[type]} {type} indexed out of " f"{count}." if total_inserted_record_dict else f"Total 0 {type} " f"indexed out of {count}") def workplace_add_permission(self, permission_dict): """This method used to index the user permissions into Workplace Search for the user in parameter user_name :param user_name: A string value denoting the username of the user :param permission: Permission that needs to be provided to the user """ try: for user, roles in permission_dict.items(): user_permissions = split_documents_into_equal_chunks( roles, PERMISSION_LIMIT ) ws_permissions = self.workplace_search_custom_client.list_permissions() for permissions in user_permissions: if ws_permissions: permission_lists = ws_permissions["results"] for list_permission in permission_lists: user_name = ( list_permission["user"] if "user" in list_permission else list_permission["external_user_properties"][0][ "attribute_value" ] ) if user == user_name: if list_permission["permissions"]: permissions.extend(list_permission["permissions"]) self.workplace_search_custom_client.add_permissions( user, list(set(permissions)), ) self.logger.info( f"Successfully indexed the permissions for {user} user into the " "Workplace Search" ) except Exception as exception: self.logger.exception( f"Error while indexing the permissions to the workplace." f"Error: {exception}" ) raise exception def delete_documents(self, final_deleted_list): """Deletes the documents of specified ids from Workplace Search :param final_deleted_list: List of ids to delete the documents from Workplace Search """ for index in range(0, len(final_deleted_list), constant.BATCH_SIZE): final_list = final_deleted_list[index: index + constant.BATCH_SIZE] # Logic to delete documents from the workplace search self.workplace_search_custom_client.delete_documents(final_list) def perform_sync(self): """Pull documents from the queue and synchronize it to the Enterprise Search.""" signal_open = True while signal_open: documents_to_index, deleted_document = [], [] while ( len(documents_to_index) < constant.BATCH_SIZE and len(str(documents_to_index)) < self.max_allowed_bytes ): documents = self.queue.get() if documents.get("type") == "signal_close": signal_open = False break elif documents.get("type") == "checkpoint": self.checkpoint_list.append(documents) break elif documents.get("type") == "permissions": self.permission_list_to_index.append(documents.get("data")) elif documents.get("type") == "deletion": deleted_document.extend(documents.get("data")) else: documents_to_index.extend(documents.get("data")) if documents_to_index: documents_to_index = list(unique_everseen(documents_to_index)) for chunk in split_documents_into_equal_chunks( documents_to_index, constant.BATCH_SIZE ): for documents in split_documents_into_equal_bytes( chunk, self.max_allowed_bytes ): self.index_documents(chunk) if deleted_document: deleted_document = list(unique_everseen(deleted_document)) for chunk in split_documents_into_equal_chunks( deleted_document, constant.BATCH_SIZE ): self.delete_documents(chunk) if not signal_open: break