ees_microsoft_teams/enterprise_search_wrapper.py (178 lines of code) (raw):

# # Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one # or more contributor license agreements. Licensed under the Elastic License 2.0; # you may not use this file except in compliance with the Elastic License 2.0. # """This module perform operations related to Enterprise Search based on the Enterprise Search version """ import elastic_transport from elastic_enterprise_search import WorkplaceSearch, __version__ from packaging import version ENTERPRISE_V8 = version.parse("8.0") class EnterpriseSearchWrapper: """This class contains operations related to Enterprise Search such as index documents, delete documents, etc.""" def __init__(self, logger, config, args): self.logger = logger self.version = version.parse(__version__) self.host = config.get_value("enterprise_search.host_url") self.api_key = config.get_value("enterprise_search.api_key") self.ws_source = config.get_value("enterprise_search.source_id") if self.version >= ENTERPRISE_V8: if hasattr(args, "user") and args.user: self.workplace_search_client = WorkplaceSearch( self.host, basic_auth=(args.user, args.password) ) else: self.workplace_search_client = WorkplaceSearch( self.host, bearer_auth=self.api_key, ) else: if hasattr(args, "user") and args.user: self.workplace_search_client = WorkplaceSearch( f"{self.host}/api/ws/v1/sources", http_auth=(args.user, args.password), ) else: self.workplace_search_client = WorkplaceSearch( f"{self.host}/api/ws/v1/sources", http_auth=self.api_key ) def add_permissions(self, user_name, permission_list): """Add one or more permission for a given user. Permissions are added atop the existing. :param user_name: user to assign permissions :param permission_list: list of permissions """ try: if self.version >= ENTERPRISE_V8: from elastic_enterprise_search.exceptions import ( BadRequestError, ConflictError) external_user_properties = [ { "attribute_name": "_elasticsearch_username", "attribute_value": user_name, } ] try: self.workplace_search_client.create_external_identity( content_source_id=self.ws_source, external_user_id=user_name, external_user_properties=external_user_properties, permissions=permission_list, ) except ConflictError: self.logger.debug( f"External entity :{user_name} already exits. Trying to update the existing permissions.." ) self.workplace_search_client.put_external_identity( content_source_id=self.ws_source, external_user_id=user_name, external_user_properties=external_user_properties, permissions=permission_list, ) except BadRequestError: raise ValueError("Incompatible version") else: try: self.workplace_search_client.add_user_permissions( content_source_id=self.ws_source, user=user_name, body={"permissions": permission_list}, ) except elastic_transport.exceptions.NotFoundError: raise ValueError("Incompatible version") self.logger.info( f"Successfully indexed the permissions for {user_name} user into the Workplace Search" ) except ValueError as error: raise ValueError(f"Please compare the Enterprise Search version used while running the installation \ to the version of Enterprise Search installed. Error: {error}") except Exception as exception: self.logger.exception( f"Error while indexing the permissions for user: {user_name} to the workplace. Error: {exception}" ) def list_permissions(self): """List permissions for one or all users""" user_permission = [] try: if self.version >= ENTERPRISE_V8: user_permission = self.workplace_search_client.list_external_identities( content_source_id=self.ws_source ) else: try: user_permission = self.workplace_search_client.list_permissions( content_source_id=self.ws_source, ) except elastic_transport.exceptions.NotFoundError: raise ValueError("Incompatible version") self.logger.info( "Successfully retrieves all permissions from the workplace" ) except ValueError as error: raise ValueError(f"Please compare the Enterprise Search version used while running the installation \ to the version of Enterprise Search installed. Error: {error}") except Exception as exception: self.logger.exception( f"Error while retrieving the permissions from the workplace. Error: {exception}" ) return user_permission def remove_permissions(self, permission): """Removes one or more permissions from an existing set of permissions :param permission: dictionary containing permission of particular user """ try: if self.version >= ENTERPRISE_V8: if permission.get("external_user_properties"): user_name = permission["external_user_properties"][0]["attribute_value"] self.workplace_search_client.delete_external_identity( content_source_id=self.ws_source, external_user_id=user_name ) else: raise ValueError("Incompatible version") else: try: user_name = permission["user"] self.workplace_search_client.remove_user_permissions( content_source_id=self.ws_source, user=user_name, body={"permissions": permission["permissions"]}, ) except elastic_transport.exceptions.NotFoundError: raise ValueError("Incompatible version") self.logger.info("Successfully removed the permissions from the Workplace Search.") except ValueError as error: raise ValueError(f"Please compare the Enterprise Search version used while running the installation \ to the version of Enterprise Search installed. Error: {error}") except Exception as exception: self.logger.exception( f"Error while removing the permissions from the workplace. Error: {exception}" ) def create_content_source(self, schema, display, name, is_searchable): """Create a content source :param schema: schema of the content source :param display: display schema for the content source :param name: name of the content source :param is_searchable: boolean to indicate source is searchable or not """ try: if self.version >= ENTERPRISE_V8: response = self.workplace_search_client.create_content_source( name=name, schema=schema, display=display, is_searchable=is_searchable, ) else: body = { "name": name, "schema": schema, "display": display, "is_searchable": is_searchable, } response = self.workplace_search_client.create_content_source(body=body) content_source_id = response.get("id") self.logger.info( f"Created ContentSource with ID {content_source_id}. \ You may now begin indexing with content-source-id= {content_source_id}" ) except Exception as exception: self.logger.error(f"Could not create a content source, Error {exception}") def delete_documents(self, document_ids): """Deletes a list of documents from a custom content source :param document_ids: list of document ids to be deleted from Enterprise Search """ try: self.workplace_search_client.delete_documents( content_source_id=self.ws_source, document_ids=document_ids, ) except Exception as exception: self.logger.exception( f"Error while checking for deleted files. Error: {exception}" ) def index_documents(self, documents, timeout): """Indexes one or more new documents into a custom content source, or updates one or more existing documents :param documents: list of documents to be indexed :param timeout: Timeout in seconds """ try: responses = self.workplace_search_client.index_documents( content_source_id=self.ws_source, documents=documents, request_timeout=timeout, ) except Exception as exception: self.logger.exception(f"Error while indexing the files. Error: {exception}") raise return responses