openai_vector/track.py (154 lines of code) (raw):

import bz2 import json import logging import os import statistics from typing import Any, List logger = logging.getLogger(__name__) QUERIES_FILENAME: str = "queries.json.bz2" TRUE_KNN_FILENAME: str = "open_ai_true_top_1000.json.bz2" def compute_percentile(data: List[Any], percentile): size = len(data) if size <= 0: return None sorted_data = sorted(data) index = int(round(percentile * size / 100)) - 1 return sorted_data[max(min(index, size - 1), 0)] class KnnParamSource: def __init__(self, track, params, **kwargs): # choose a suitable index: if there is only one defined for this track # choose that one, but let the user always override index if len(track.indices) == 1: default_index = track.indices[0].name else: default_index = "_all" self._index_name = params.get("index", default_index) self._cache = params.get("cache", False) self._params = params self._queries = [] cwd = os.path.dirname(__file__) with bz2.open(os.path.join(cwd, QUERIES_FILENAME), "r") as queries_file: for vector_query in queries_file: self._queries.append(json.loads(vector_query)) self._iters = 0 self._maxIters = len(self._queries) self.infinite = True def partition(self, partition_index, total_partitions): return self def params(self): result = {"index": self._index_name, "cache": self._params.get("cache", False), "size": self._params.get("k", 10)} num_candidates = self._params.get("num-candidates", 50) oversample = self._params.get("oversample", 0) query_vec = self._queries[self._iters] knn_query = { "knn": { "field": "emb", "query_vector": query_vec, "k": result["size"], "num_candidates": num_candidates, } } if "filter" in self._params: knn_query["knn"]["filter"] = self._params["filter"] if oversample > 0: knn_query["knn"]["rescore_vector"] = {"oversample": oversample} result["body"] = {"query": knn_query, "_source": False} self._iters += 1 if self._iters >= self._maxIters: self._iters = 0 return result class KnnVectorStore: def __init__(self): cwd = os.path.dirname(__file__) self._query_nearest_neighbor_docids = [] self._queries = [] with bz2.open(os.path.join(cwd, TRUE_KNN_FILENAME), "r") as queries_file: for docids in queries_file: self._query_nearest_neighbor_docids.append(json.loads(docids)) with bz2.open(os.path.join(cwd, QUERIES_FILENAME), "r") as queries_file: for vector_query in queries_file: self._queries.append(json.loads(vector_query)) def get_query_vectors(self) -> List[List[float]]: return self._queries def get_neighbors_for_query(self, query_id: int, size: int) -> List[str]: if (query_id < 0) or (query_id >= len(self._query_nearest_neighbor_docids)): raise ValueError(f"Unknown query with id: '{query_id}' provided") if (size < 0) or (size > len(self._query_nearest_neighbor_docids[query_id])): raise ValueError(f"Invalid size: '{size}' provided for query with id: '{query_id}'") return self._query_nearest_neighbor_docids[query_id][:size] class KnnRecallParamSource: def __init__(self, track, params, **kwargs): if len(track.indices) == 1: default_index = track.indices[0].name else: default_index = "_all" self._index_name = params.get("index", default_index) self._cache = params.get("cache", False) self._params = params self.infinite = True cwd = os.path.dirname(__file__) def partition(self, partition_index, total_partitions): return self def params(self): return { "index": self._index_name, "cache": self._params.get("cache", False), "size": self._params.get("k", 10), "num_candidates": self._params.get("num-candidates", 50), "oversample": self._params.get("oversample", 0), "knn_vector_store": KnnVectorStore(), } # Used in tandem with the KnnRecallParamSource. # reads the queries, executes knn search and compares the results with the true nearest neighbors class KnnRecallRunner: def get_knn_query(self, query_vec, k, num_candidates, oversample): knn_query = { "knn": { "field": "emb", "query_vector": query_vec, "k": k, "num_candidates": num_candidates, } } if oversample > 0: knn_query["knn"]["rescore_vector"] = {"oversample": oversample} return {"query": knn_query, "_source": False} async def __call__(self, es, params): k = params["size"] num_candidates = params["num_candidates"] index = params["index"] request_cache = params["cache"] recall_total = 0 exact_total = 0 min_recall = k max_recall = 0 knn_vector_store: KnnVectorStore = params["knn_vector_store"] for query_id, query_vector in enumerate(knn_vector_store.get_query_vectors()): knn_body = self.get_knn_query(query_vector, k, num_candidates, params["oversample"]) knn_body["_source"] = False knn_body["docvalue_fields"] = ["docid"] knn_result = await es.search( body=knn_body, index=index, request_cache=request_cache, size=k, ) knn_hits = [hit["fields"]["docid"][0] for hit in knn_result["hits"]["hits"]] true_neighbors = knn_vector_store.get_neighbors_for_query(query_id, k)[:k] current_recall = len(set(knn_hits).intersection(set(true_neighbors))) recall_total += current_recall exact_total += len(true_neighbors) min_recall = min(min_recall, current_recall) max_recall = max(max_recall, current_recall) to_return = { "avg_recall": recall_total / exact_total, "min_recall": min_recall, "max_recall": max_recall, "k": k, "num_candidates": num_candidates, "oversample": params["oversample"], } logger.info(f"Recall results: {to_return}") return to_return def __repr__(self, *args, **kwargs): return "knn-recall" def register(registry): registry.register_param_source("knn-param-source", KnnParamSource) registry.register_param_source("knn-recall-param-source", KnnRecallParamSource) registry.register_runner("knn-recall", KnnRecallRunner(), async_runner=True)