random_vector/track.py (71 lines of code) (raw):
import random
from esrally.track.params import ParamSource
class RandomBulkParamSource(ParamSource):
def __init__(self, track, params, **kwargs):
super().__init__(track, params, **kwargs)
self._bulk_size = params.get("bulk-size", 1000)
self._index_name = params.get("index", track.indices[0].name)
self._dims = params.get("dims", 128)
self._partitions = params.get("partitions", 1000)
def params(self):
import numpy as np
bulk_data = []
for _ in range(self._bulk_size):
vec = np.random.rand(self._dims)
partition_id = random.randint(0, self._partitions)
bulk_data.append({"index": {"_index": self._index_name, "routing": partition_id}})
bulk_data.append({"partition_id": partition_id, "emb": vec.tolist()})
return {
"body": bulk_data,
"bulk-size": self._bulk_size,
"action-metadata-present": True,
"unit": "docs",
"index": self._index_name,
"type": "",
}
def generate_knn_query(query_vector, partition_id, k):
return {
"knn": {
"field": "emb",
"query_vector": query_vector,
"k": k,
"num_candidates": k,
"filter": {"term": {"partition_id": partition_id}},
}
}
def generate_script_query(query_vector, partition_id):
return {
"query": {
"script_score": {
"query": {"term": {"partition_id": partition_id}},
"script": {"source": "cosineSimilarity(params.query_vector, 'emb') + 1.0", "params": {"query_vector": query_vector}},
}
}
}
class RandomSearchParamSource:
def __init__(self, track, params, **kwargs):
# choose a suitable index: if there is only one defined for this track
# choose that one, but let the user always override index
if len(track.indices) == 1:
default_index = track.indices[0].name
else:
default_index = "_all"
self._index_name = params.get("index", default_index)
self._cache = params.get("cache", False)
self._partitions = params.get("partitions", 1000)
self._dims = params.get("dims", 128)
self._top_k = params.get("k", 10)
self._script = params.get("script", True)
self.infinite = True
def partition(self, partition_index, total_partitions):
return self
def params(self):
import numpy as np
partition_id = random.randint(0, self._partitions)
query_vec = np.random.rand(self._dims).tolist()
if self._script:
query = generate_script_query(query_vec, partition_id)
else:
query = generate_knn_query(query_vec, partition_id, self._topk)
return {"index": self._index_name, "cache": self._cache, "size": self._top_k, "_source_excludes": ["emb"], "body": query}
def register(registry):
registry.register_param_source("random-bulk-param-source", RandomBulkParamSource)
registry.register_param_source("knn-param-source", RandomSearchParamSource)