rally-custom/custom_tracks/elasticsearch/dense_vector/track.py [128:170]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
class KnnVectorStore:
    @staticmethod
    def empty_store():
        return defaultdict(lambda: defaultdict(list))

    def __init__(self, queries_file: str, vector_field: str):
        assert queries_file and vector_field
        self._query_vectors = load_query_vectors(queries_file)
        self._vector_field = vector_field
        self._store = KnnVectorStore.empty_store()

    async def get_neighbors_for_query(self, index: str, query_id: int, size: int, request_cache: bool, client) -> List[str]:
        try:
            logger.debug(f"Fetching exact neighbors for {query_id} from in-memory store")
            exact_neighbors = self._store[index][query_id]
            if not exact_neighbors or len(exact_neighbors) < size:
                logger.debug(f"Query vector with id {query_id} not cached or has fewer then {size} requested results - computing neighbors")
                self._store[index][query_id] = await self.load_exact_neighbors(index, query_id, size, request_cache, client)
                logger.debug(f"Finished computing exact neighbors for {query_id} - it's now cached!")
            return self._store[index][query_id]
        except Exception as ex:
            logger.exception(f"Failed to compute nearest neighbors for '{query_id}'. Returning empty results instead.", ex)
            return []

    async def load_exact_neighbors(self, index: str, query_id: int, max_size: int, request_cache: bool, client):
        if query_id not in self._query_vectors:
            raise ValueError(f"Unknown query with id: '{query_id}' provided")
        return await extract_exact_neighbors(self._query_vectors[query_id], index, max_size, self._vector_field, request_cache, client)

    def invalidate_all(self):
        logger.info("Invalidating all entries from knn-vector-store")
        self._store = KnnVectorStore.empty_store()

    def get_query_vectors(self) -> Dict[int, List[float]]:
        if len(self._query_vectors) == 0:
            raise ValueError("Query vectors have not been initialized.")
        return self._query_vectors

    @classmethod
    @functools.lru_cache(maxsize=1)
    def get_instance(cls, queries_file: str, vector_field):
        logger.info(f"Initializing KnnVectorStore for queries file: '{queries_file}' and vector field: '{vector_field}'")
        return KnnVectorStore(queries_file, vector_field)
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



rally-custom/custom_tracks/opensearch/dense_vector/track.py [73:115]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
class KnnVectorStore:
    @staticmethod
    def empty_store():
        return defaultdict(lambda: defaultdict(list))

    def __init__(self, queries_file: str, vector_field: str):
        assert queries_file and vector_field
        self._query_vectors = load_query_vectors(queries_file)
        self._vector_field = vector_field
        self._store = KnnVectorStore.empty_store()

    async def get_neighbors_for_query(self, index: str, query_id: int, size: int, request_cache: bool, client) -> List[str]:
        try:
            logger.debug(f"Fetching exact neighbors for {query_id} from in-memory store")
            exact_neighbors = self._store[index][query_id]
            if not exact_neighbors or len(exact_neighbors) < size:
                logger.debug(f"Query vector with id {query_id} not cached or has fewer then {size} requested results - computing neighbors")
                self._store[index][query_id] = await self.load_exact_neighbors(index, query_id, size, request_cache, client)
                logger.debug(f"Finished computing exact neighbors for {query_id} - it's now cached!")
            return self._store[index][query_id]
        except Exception as ex:
            logger.exception(f"Failed to compute nearest neighbors for '{query_id}'. Returning empty results instead.", ex)
            return []

    async def load_exact_neighbors(self, index: str, query_id: int, max_size: int, request_cache: bool, client):
        if query_id not in self._query_vectors:
            raise ValueError(f"Unknown query with id: '{query_id}' provided")
        return await extract_exact_neighbors(self._query_vectors[query_id], index, max_size, self._vector_field, request_cache, client)

    def invalidate_all(self):
        logger.info("Invalidating all entries from knn-vector-store")
        self._store = KnnVectorStore.empty_store()

    def get_query_vectors(self) -> Dict[int, List[float]]:
        if len(self._query_vectors) == 0:
            raise ValueError("Query vectors have not been initialized.")
        return self._query_vectors

    @classmethod
    @functools.lru_cache(maxsize=1)
    def get_instance(cls, queries_file: str, vector_field):
        logger.info(f"Initializing KnnVectorStore for queries file: '{queries_file}' and vector field: '{vector_field}'")
        return KnnVectorStore(queries_file, vector_field)
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



