app/services/elasticsearch.py (71 lines of code) (raw):

"""(c) 2025, Elastic Co. Author: Adhish Thite <adhish.thite@elastic.co> """ from typing import Optional import bigframes.pandas as bf from elasticsearch import Elasticsearch from elasticsearch.helpers import bulk from config.logging_config import setup_logger from config.settings import ES_URL, ES_API_KEY, ES_VECTOR_INDEX_NAME logger = setup_logger(__name__) def get_elasticsearch_client() -> Elasticsearch: """ Create and return a singleton Elasticsearch client. Returns: Elasticsearch: The Elasticsearch client instance. """ if not hasattr(get_elasticsearch_client, "client"): get_elasticsearch_client.client = Elasticsearch(ES_URL).options( api_key=ES_API_KEY ) logger.info("🔗 Created new Elasticsearch client") return get_elasticsearch_client.client def create_elastic_index(es_client: Elasticsearch, index_name: str) -> None: """ Delete the Elasticsearch index if it exists, then create a new one. Args: es_client (Elasticsearch): The Elasticsearch client. index_name (str): The name of the index to be created. """ if es_client.indices.exists(index=index_name): es_client.indices.delete(index=index_name) logger.info(f"🗑️ Deleted existing Elasticsearch index: {index_name}") es_client.indices.create(index=index_name) logger.info(f"✨ Created new Elasticsearch index: {index_name}") def insert_dataframe_to_elasticsearch( es_client: Elasticsearch, index_name: str, dataframe: bf.DataFrame, doc_type: str = "kb", # 'kb' or 'news' chunk_size: int = 500, ) -> None: """ Insert the DataFrame into the Elasticsearch index in bulk, using chunks. Args: es_client (Elasticsearch): The Elasticsearch client. index_name (str): The name of the index to insert data into. dataframe (bf.DataFrame): The DataFrame to be inserted. doc_type (str): The type of document ('kb' or 'news'). Defaults to 'kb'. chunk_size (int): The number of documents to insert in each bulk operation. Defaults to 500. """ def generate_actions(): for _, row in dataframe.iterrows(): doc = row.to_dict() doc["doc_type"] = doc_type # Add document type yield {"_index": index_name, "_source": doc} total_documents = len(dataframe) success, _ = bulk( es_client, generate_actions(), chunk_size=chunk_size, refresh=True ) logger.info( f"📥 Inserted {success}/{total_documents} {doc_type} documents into Elasticsearch index: {index_name}" ) def check_article_id_and_hash( client: Elasticsearch, index: str, article_id: str ) -> Optional[str]: """Check if an article with the given ID exists and return its hash if it does.""" query = {"query": {"term": {"metadata.article_id": article_id}}} response = client.search(index=index, body=query) if response["hits"]["total"]["value"] > 0: return response["hits"]["hits"][0]["_source"]["article_hash"] return None def delete_embeddings_by_article_id(client: Elasticsearch, index: str, article_id: str): """Delete all documents for an article with the given ID from the given index.""" query = {"query": {"term": {"metadata.article_id": article_id}}} client.delete_by_query(index=index, body=query) logger.info(f"🗑️ Deleted embeddings for article {article_id}") def create_vector_index(client: Elasticsearch): """Create a vector index.""" client.indices.create( index=ES_VECTOR_INDEX_NAME, ignore=400, body={ "mappings": { "properties": { "embedding": { "type": "dense_vector", "dims": 1536, }, "page_content": {"type": "text"}, "metadata": {"type": "object"}, "article_id": {"type": "keyword"}, "chunk_id": {"type": "text"}, "article_hash": {"type": "keyword"}, } } }, )