rally-custom/custom_tracks/elasticsearch/dense_vector/track.py [9:80]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
logger = logging.getLogger(__name__)


def extract_vector_operations_count(knn_result):
    vector_operations_count = 0
    profile = knn_result["profile"]
    for shard in profile["shards"]:
        assert len(shard["dfs"]["knn"]) == 1
        knn_search = shard["dfs"]["knn"][0]
        if "vector_operations_count" in knn_search:
            vector_operations_count += knn_search["vector_operations_count"]
    return vector_operations_count


def compute_percentile(data: List[Any], percentile):
    size = len(data)
    if size <= 0:
        return None
    sorted_data = sorted(data)
    index = int(round(percentile * size / 100)) - 1
    return sorted_data[max(min(index, size - 1), 0)]


def load_query_vectors(queries_file) -> Dict[int, List[float]]:
    if not (os.path.exists(queries_file) and os.path.isfile(queries_file)):
        raise ValueError(f"Provided queries file '{queries_file}' does not exist or is not a file")
    query_vectors: Dict[int, List[float]]
    with open(queries_file, "r") as f:
        logger.debug(f"Reading query vectors from '{queries_file}'")
        lines = f.readlines()
        query_vectors = {_index: json.loads(vector) for _index, vector in enumerate(lines)}
        logger.debug(f"Finished reading query vectors from '{queries_file}'")
    return query_vectors


async def extract_exact_neighbors(
    query_vector: List[float], index: str, max_size: int, vector_field: str, request_cache: bool, client
) -> List[str]:
    script_query = {
        "query": {
            "script_score": {
                "query": {"match_all": {}},
                "script": {
                    "source": f"cosineSimilarity(params.query, '{vector_field}') + 1.0",
                    "params": {"query": query_vector},
                },
            }
        },
        "_source": False,
    }
    script_result = await client.search(
        body=script_query,
        index=index,
        request_cache=request_cache,
        size=max_size,
    )
    return [hit["_id"] for hit in script_result["hits"]["hits"]]


class KnnParamSource:
    def __init__(self, track, params, **kwargs):
        # choose a suitable index: if there is only one defined for this track
        # choose that one, but let the user always override index
        if len(track.indices) == 1:
            default_index = track.indices[0].name
        else:
            default_index = "_all"

        self._index_name = params.get("index", default_index)
        self._cache = params.get("cache", False)
        self._exact_scan = params.get("exact", False)
        self._params = params
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



rally-custom/custom_tracks/elasticsearch/so_vector/track.py [10:79]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
logger = logging.getLogger(__name__)


def extract_vector_operations_count(knn_result):
    vector_operations_count = 0
    profile = knn_result["profile"]
    for shard in profile["shards"]:
        assert len(shard["dfs"]["knn"]) == 1
        knn_search = shard["dfs"]["knn"][0]
        if "vector_operations_count" in knn_search:
            vector_operations_count += knn_search["vector_operations_count"]
    return vector_operations_count


def compute_percentile(data: List[Any], percentile):
    size = len(data)
    if size <= 0:
        return None
    sorted_data = sorted(data)
    index = int(round(percentile * size / 100)) - 1
    return sorted_data[max(min(index, size - 1), 0)]


def load_query_vectors(queries_file) -> Dict[int, List[float]]:
    if not (os.path.exists(queries_file) and os.path.isfile(queries_file)):
        raise ValueError(f"Provided queries file '{queries_file}' does not exist or is not a file")
    query_vectors: Dict[int, List[float]]
    with open(queries_file, "r") as f:
        logger.debug(f"Reading query vectors from '{queries_file}'")
        lines = f.readlines()
        query_vectors = {_index: json.loads(vector) for _index, vector in enumerate(lines)}
        logger.debug(f"Finished reading query vectors from '{queries_file}'")
    return query_vectors

async def extract_exact_neighbors(
    query_vector: List[float], index: str, max_size: int, vector_field: str, request_cache: bool, client
) -> List[str]:
    script_query = {
        "query": {
            "script_score": {
                "query": {"match_all": {}},
                "script": {
                    "source": f"cosineSimilarity(params.query, '{vector_field}') + 1.0",
                    "params": {"query": query_vector},
                },
            }
        },
        "_source": False,
    }
    script_result = await client.search(
        body=script_query,
        index=index,
        request_cache=request_cache,
        size=max_size,
    )
    return [hit["_id"] for hit in script_result["hits"]["hits"]]

class KnnParamSource:
    def __init__(self, track, params, **kwargs):
        # choose a suitable index: if there is only one defined for this track
        # choose that one, but let the user always override index
        if len(track.indices) == 1:
            default_index = track.indices[0].name
        else:
            default_index = "_all"

        self._index_name = params.get("index", default_index)
        self._cache = params.get("cache", False)
        self._exact_scan = params.get("exact", False)
        self._params = params
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



