def search_similar()

in supporting-blog-content/building-multimodal-rag-with-elasticsearch-gotham/src/elastic_manager.py [0:0]


    def search_similar(self, query_embedding, modality=None, k=5):
        """Searches for similar contents"""
        query = {
            "knn": {
                "field": "embedding",
                "query_vector": query_embedding.tolist(),
                "k": k,
                "num_candidates": 100,
                "filter": [{"term": {"modality": modality}}] if modality else [],
            }
        }

        try:
            response = self.es.search(index=self.index_name, query=query, size=k)

            # Return both source data and score for each hit
            return [
                {**hit["_source"], "score": hit["_score"]}
                for hit in response["hits"]["hits"]
            ]

        except Exception as e:
            print(f"Error: processing search_evidence: {str(e)}")
            return "Error generating search evidence"