in 11-recommendation/helper_functions.py [0:0]
def query_qdrant(query: str, category: str, top_k: int = 10) -> List[Dict[str, Any]]:
"""
Queries the Qdrant collection to find the top K most similar items to the given query vector.
:param query: The query string.
:param category: The category to filter the search.
:param top_k: The number of top similar items to retrieve.
:return: A list of dictionaries containing the most similar items and their details.
"""
try:
logger.info(f"Querying Qdrant with category: '{category}' with query: '{query}'")
# Start of Selection
if not query:
logger.error("Empty query string. Cannot perform query.")
return []
else:
# Generate embeddings for the query
query_vector = generate_embeddings(query)
# Perform the search with filter
search_result = qdrant_client.search(
collection_name=Config.COLLECTION_NAME,
query_vector=query_vector,
limit=top_k,
query_filter=models.Filter(
must=[
models.FieldCondition(
key="category",
match=models.MatchValue(
value=category,
),
)
]
),
search_params=models.SearchParams(exact=False),
)
logger.info(f"Query successful. Retrieved {len(search_result)} results.")
# Extract and return the results
results = [
{
"id": hit.id,
"payload": hit.payload
}
for hit in search_result
]
return results
except Exception as e:
logger.error(f"Failed to query Qdrant: {e}")
return []