in supporting-blog-content/building-multimodal-rag-with-elasticsearch-gotham/src/elastic_manager.py [0:0]
def search_similar(self, query_embedding, modality=None, k=5):
"""Searches for similar contents"""
query = {
"knn": {
"field": "embedding",
"query_vector": query_embedding.tolist(),
"k": k,
"num_candidates": 100,
"filter": [{"term": {"modality": modality}}] if modality else [],
}
}
try:
response = self.es.search(index=self.index_name, query=query, size=k)
# Return both source data and score for each hit
return [
{**hit["_source"], "score": hit["_score"]}
for hit in response["hits"]["hits"]
]
except Exception as e:
print(f"Error: processing search_evidence: {str(e)}")
return "Error generating search evidence"