rally-custom/custom_tracks/opensearch/dense_vector/track.py (232 lines of code) (raw):

import functools import json import logging import os import statistics from collections import defaultdict from typing import Any, Dict, List logger = logging.getLogger(__name__) def compute_percentile(data: List[Any], percentile): size = len(data) if size <= 0: return None sorted_data = sorted(data) index = int(round(percentile * size / 100)) - 1 return sorted_data[max(min(index, size - 1), 0)] def load_query_vectors(queries_file) -> Dict[int, List[float]]: if not (os.path.exists(queries_file) and os.path.isfile(queries_file)): raise ValueError(f"Provided queries file '{queries_file}' does not exist or is not a file") query_vectors: Dict[int, List[float]] with open(queries_file, "r") as f: logger.debug(f"Reading query vectors from '{queries_file}'") lines = f.readlines() query_vectors = {_index: json.loads(vector) for _index, vector in enumerate(lines)} logger.debug(f"Finished reading query vectors from '{queries_file}'") return query_vectors async def extract_exact_neighbors( query_vector: List[float], index: str, max_size: int, vector_field: str, request_cache: bool, client ) -> List[str]: script_query = { "_source": False, "query": { "script_score": { "query": {"match_all": {}}, "script": { "source": "knn_score", "lang": "knn", "params": { "field": vector_field, "query_value": query_vector, "space_type": "cosinesimil" } } } } } script_result = await client.search( body=script_query, index=index, request_cache=request_cache, size=max_size, ) return [hit["_id"] for hit in script_result["hits"]["hits"]] async def put_index_settings(es, params): await es.perform_request(method="PUT", path=f'/{params["index"]}/_settings', body=params["body"] ) class KnnVectorStore: @staticmethod def empty_store(): return defaultdict(lambda: defaultdict(list)) def __init__(self, queries_file: str, vector_field: str): assert queries_file and vector_field self._query_vectors = load_query_vectors(queries_file) self._vector_field = vector_field self._store = KnnVectorStore.empty_store() async def get_neighbors_for_query(self, index: str, query_id: int, size: int, request_cache: bool, client) -> List[str]: try: logger.debug(f"Fetching exact neighbors for {query_id} from in-memory store") exact_neighbors = self._store[index][query_id] if not exact_neighbors or len(exact_neighbors) < size: logger.debug(f"Query vector with id {query_id} not cached or has fewer then {size} requested results - computing neighbors") self._store[index][query_id] = await self.load_exact_neighbors(index, query_id, size, request_cache, client) logger.debug(f"Finished computing exact neighbors for {query_id} - it's now cached!") return self._store[index][query_id] except Exception as ex: logger.exception(f"Failed to compute nearest neighbors for '{query_id}'. Returning empty results instead.", ex) return [] async def load_exact_neighbors(self, index: str, query_id: int, max_size: int, request_cache: bool, client): if query_id not in self._query_vectors: raise ValueError(f"Unknown query with id: '{query_id}' provided") return await extract_exact_neighbors(self._query_vectors[query_id], index, max_size, self._vector_field, request_cache, client) def invalidate_all(self): logger.info("Invalidating all entries from knn-vector-store") self._store = KnnVectorStore.empty_store() def get_query_vectors(self) -> Dict[int, List[float]]: if len(self._query_vectors) == 0: raise ValueError("Query vectors have not been initialized.") return self._query_vectors @classmethod @functools.lru_cache(maxsize=1) def get_instance(cls, queries_file: str, vector_field): logger.info(f"Initializing KnnVectorStore for queries file: '{queries_file}' and vector field: '{vector_field}'") return KnnVectorStore(queries_file, vector_field) class KnnParamSource: def __init__(self, track, params, **kwargs): # choose a suitable index: if there is only one defined for this track # choose that one, but let the user always override index if len(track.indices) == 1: default_index = track.indices[0].name else: default_index = "_all" self._index_name = params.get("index", default_index) self._cache = params.get("cache", False) self._exact_scan = params.get("exact", False) self._params = params self._queries = [] cwd = os.path.dirname(__file__) with open(os.path.join(cwd, "queries.json"), "r") as file: for line in file: self._queries.append(json.loads(line)) self._iters = 0 self.infinite = True self._vector_field = "vector" def partition(self, partition_index, total_partitions): return self def params(self): result = {"index": self._index_name, "cache": self._params.get("cache", False), "size": self._params.get("k", 10)} if self._exact_scan: result["body"] = { "_source": False, "query": { "script_score": { "query": {"match_all": {}}, "script": { "source": "knn_score", "lang": "knn", "params": { "field": self._vector_field, "query_value": self._queries[self._iters], "space_type": "cosinesimil" } } } } } else: result["body"] = { "query": { "knn": { self._vector_field: { "vector": self._queries[self._iters], "k": self._params.get("num-candidates", 10) } } }, "_source": False } self._iters += 1 if self._iters >= len(self._queries): self._iters = 0 return result # For each query this will generate both the knn query and the equivalent # score script query. The two queries can then be executed and used # to gauge the accuracy of the knn query. class KnnRecallParamSource: def __init__(self, track, params, **kwargs): if len(track.indices) == 1: default_index = track.indices[0].name else: default_index = "_all" self._index_name = params.get("index", default_index) self._cache = params.get("cache", False) self._params = params self.infinite = True self._target_k = 1_000 cwd = os.path.dirname(__file__) self._queries_file: str = os.path.join(cwd, "queries.json") self._vector_field: str = "vector" def partition(self, partition_index, total_partitions): return self def params(self): return { "index": self._index_name, "cache": self._params.get("cache", False), "size": self._params.get("k", 10), "num_candidates": self._params.get("num-candidates", 100), "target_k": self._target_k, "knn_vector_store": KnnVectorStore.get_instance(self._queries_file, self._vector_field), "invalidate_vector_store": self._params.get("invalidate-vector-store", False), } # Used in tandem with the KnnRecallParamSource. This executes both a knn query # and an equivalent score script query. Results are then compared to gauge # the accuracy of the knn query. class KnnRecallRunner: async def __call__(self, es, params): k = params["size"] num_candidates = params["num_candidates"] index = params["index"] request_cache = params["cache"] target_k = max(params["target_k"], k) recall_total = 0 exact_total = 0 min_recall = k knn_vector_store: KnnVectorStore = params["knn_vector_store"] invalidate_vector_store: bool = params["invalidate_vector_store"] if invalidate_vector_store: knn_vector_store.invalidate_all() for query_id, query_vector in knn_vector_store.get_query_vectors().items(): knn_result = await es.search( body={ "query": { "knn": { "vector": { "vector": query_vector, "k": num_candidates } } }, "_source": False, "profile": True }, index=index, request_cache=request_cache, size=k, ) knn_hits = [hit["_id"] for hit in knn_result["hits"]["hits"]] script_hits = await knn_vector_store.get_neighbors_for_query(index, query_id, target_k, request_cache, es) script_hits = script_hits[:k] current_recall = len(set(knn_hits).intersection(set(script_hits))) recall_total += current_recall exact_total += len(script_hits) min_recall = min(min_recall, current_recall) return ( { "avg_recall": recall_total / exact_total, "min_recall": min_recall, "k": k, "num_candidates": num_candidates } if exact_total > 0 else None ) def __repr__(self, *args, **kwargs): return "knn-recall" def register(registry): registry.register_param_source("knn-param-source", KnnParamSource) registry.register_param_source("knn-recall-param-source", KnnRecallParamSource) registry.register_runner("knn-recall", KnnRecallRunner(), async_runner=True) registry.register_runner("put-index-settings", put_index_settings, async_runner=True)