wikipedia/track.py (239 lines of code) (raw):

import csv import math import random import re from os import getcwd from os.path import dirname from typing import Iterator, List from esrally.track.params import ParamSource QUERIES_DIRNAME: str = dirname(__file__) QUERIES_FILENAME: str = f"{QUERIES_DIRNAME}/queries.csv" SAMPLE_IDS_FILENAME: str = f"{QUERIES_DIRNAME}/ids.txt" SEARCH_APPLICATION_ROOT_ENDPOINT: str = "/_application/search_application" QUERY_RULES_ENDPOINT: str = "/_query_rules" QUERY_CLEAN_REXEXP = regexp = re.compile("[^0-9a-zA-Z]+") def query_samples(k: int, random_seed: int = None) -> List[str]: with open(QUERIES_FILENAME) as queries_file: csv_reader = csv.reader(queries_file) next(csv_reader) queries_with_probabilities = list(tuple(line) for line in csv_reader) queries = [QUERY_CLEAN_REXEXP.sub(" ", query).lower() for query, _ in queries_with_probabilities] probabilities = [float(probability) for _, probability in queries_with_probabilities] random.seed(random_seed) return random.choices(queries, weights=probabilities, k=k) # ids file was created with the following command: grep _index pages-1k.json | jq .index._id | tr -d '"' | grep -v null > ids.txt def ids_samples() -> List[str]: with open(SAMPLE_IDS_FILENAME, "r") as file: ids = {line.strip() for line in file} for i in range(100): ids.add(f"missing-id-{i}") return list(ids) class SearchApplicationParams: def __init__(self, track, params): self.indices = params.get("indices", track.index_names()) self.name = params.get("search-application", f"{self.indices[0]}-search-application") class CreateSearchApplicationParamSource(ParamSource): def __init__(self, track, params, **kwargs): super().__init__(track, params, **kwargs) self.search_application_params = SearchApplicationParams(track, params) def partition(self, partition_index, total_partitions): return self def params(self): return { "method": "PUT", "path": f"{SEARCH_APPLICATION_ROOT_ENDPOINT}/{self.search_application_params.name}", "body": {"indices": self.search_application_params.indices}, } class QueryRulesetParams: def __init__(self, track, params): self.indices = params.get("indices", track.index_names()) self.ruleset_id = params.get("ruleset_id") self.ruleset_size = params.get("ruleset_size") class QueryIteratorParamSource(ParamSource): def __init__(self, track, params, **kwargs): super().__init__(track, params, **kwargs) self._batch_size = self._params.get("batch_size", 100000) self._random_seed = self._params.get("seed", None) self._sample_queries = query_samples(self._batch_size, self._random_seed) self._queries_iterator = None def size(self): return None def partition(self, partition_index, total_partitions): if self._queries_iterator is None: self._queries_iterator = iter(self._sample_queries) return self class SearchApplicationSearchParamSource(QueryIteratorParamSource): def __init__(self, track, params, **kwargs): super().__init__(track, params, **kwargs) self.search_application_params = SearchApplicationParams(track, params) def params(self): try: query = next(self._queries_iterator) return { "method": "POST", "path": f"{SEARCH_APPLICATION_ROOT_ENDPOINT}/{self.search_application_params.name}/_search", "body": { "params": { "query_string": query, }, }, } except StopIteration: self._queries_iterator = iter(self._sample_queries) return self.params() class CreateQueryRulesetParamSource(ParamSource): def __init__(self, track, params, **kwargs): super().__init__(track, params, **kwargs) self.query_ruleset_params = QueryRulesetParams(track, params) def partition(self, partition_index, total_partitions): return self def params(self): ids = ids_samples() rules = [] for i in range(self.query_ruleset_params.ruleset_size): rule = { "rule_id": "rule_{{i}}", "type": random.choice(["pinned", "exclude"]), "criteria": [{"type": "exact", "metadata": "rule_key", "values": [random.choice(["match", "no-match"])]}], "actions": {"ids": [random.choice(ids)]}, } rules.append(rule) return {"method": "PUT", "path": f"{QUERY_RULES_ENDPOINT}/{self.query_ruleset_params.ruleset_id}", "body": {"rules": rules}} class QueryRulesSearchParamSource(QueryIteratorParamSource): def __init__(self, track, params, **kwargs): super().__init__(track, params, **kwargs) self.query_ruleset_params = QueryRulesetParams(track, params) def params(self): try: query = next(self._queries_iterator) return { "method": "POST", "path": "/_search", "body": { "query": { "rule": { "match_criteria": {"rule_key": random.choice(["match", "no-match"])}, "ruleset_ids": [self.query_ruleset_params.ruleset_id], "organic": {"query_string": {"query": query, "default_field": self._params["search-fields"]}}, } }, "size": self._params["size"], }, } except StopIteration: self._queries_iterator = iter(self._sample_queries) return self.params() class PinnedSearchParamSource(QueryIteratorParamSource): def __init__(self, track, params, **kwargs): super().__init__(track, params, **kwargs) self.query_ruleset_params = QueryRulesetParams(track, params) self.ids = ids_samples() def params(self): try: query = next(self._queries_iterator) return { "method": "POST", "path": "/_search", "body": { "query": { "pinned": { "organic": {"query_string": {"query": query, "default_field": self._params["search-fields"]}}, "ids": [random.choice(self.ids)], } }, "size": self._params["size"], }, } except StopIteration: self._queries_iterator = iter(self._sample_queries) return self.params() class RetrieverParamSource(QueryIteratorParamSource): def __init__(self, track, params, **kwargs): super().__init__(track, params, **kwargs) self._index_name = params.get("index", track.indices[0].name if len(track.indices) == 1 else "_all") self._search_fields = self._params["search-fields"] self._rerank = params.get("rerank", False) self._reranker = params.get("reranker", "random_reranker") self._size = params.get("size", 20) def params(self): standard_retriever = { "standard": {"query": {"query_string": {"query": next(self._queries_iterator), "default_field": self._search_fields}}} } retriever = standard_retriever if self._rerank: retriever = {self._reranker: {"retriever": standard_retriever, "field": self._search_fields, "rank_window_size": self._size}} try: return { "method": "POST", "path": f"/{self._index_name}/_search", "body": {"retriever": retriever, "size": self._size}, } except StopIteration: self._queries_iterator = iter(self._sample_queries) return self.params() # TODO Add other queries, check default fields for search. Compare them with other DSL queries class EsqlSearchParamSource(QueryIteratorParamSource): def __init__(self, track, params, **kwargs): super().__init__(track, params, **kwargs) self._index_name = params.get("index", track.indices[0].name if len(track.indices) == 1 else "_all") self._search_fields = self._params["search-fields"] self._size = params.get("size", 20) self._query_type = self._params["query-type"] def params(self): try: query = next(self._queries_iterator) if self._query_type == "query-string": query_body = f'QSTR("{ query }", {{"default_field": "{ self._search_fields }" }})' elif self._query_type == "match": query_body = f'MATCH(title, "{ query }") OR MATCH(content, "{ query }")' elif self._query_type == "kql": query_body = f'KQL("{ self._search_fields }:{ query }")' elif self._query_type == "term": query_body = f'TERM(title, "{ query }") OR TERM(content, "{ query }")' else: raise ValueError("Unknown query type: " + self._query_type) return { "query": f"FROM {self._index_name} METADATA _score | WHERE { query_body } | KEEP title, _score | SORT _score DESC | LIMIT { self._size }", } except StopIteration: self._queries_iterator = iter(self._sample_queries) return self.params() class QueryParamSource(QueryIteratorParamSource): def __init__(self, track, params, **kwargs): super().__init__(track, params, **kwargs) self._index_name = params.get("index", track.indices[0].name if len(track.indices) == 1 else "_all") self._cache = params.get("cache", False) self._query_type = self._params["query-type"] def params(self): try: query = next(self._queries_iterator) if self._query_type == "query-string": query_body = {"query_string": {"query": query, "default_field": self._params["search-fields"]}} elif self._query_type == "kql": query_body = {"kql": {"query": f'{ self._params["search-fields"] }:"{ query }"'}} elif self._query_type == "match": query_body = {"match": {"content": query}} elif self._query_type == "multi_match": query_body = {"bool": {"should": [{"match": {"title": query}}, {"match": {"content": query}}]}} elif self._query_type == "term": query_body = {"bool": {"should": [{"term": {"title": query}}, {"term": {"content": query}}]}} else: raise ValueError("Unknown query type: " + self._query_type) except StopIteration: self._queries_iterator = iter(self._sample_queries) return self.params() return { "body": { "query": query_body, "size": self._params["size"], }, "index": self._index_name, "cache": self._cache, } def register(registry): registry.register_param_source("query-search", QueryParamSource) registry.register_param_source("create-search-application-param-source", CreateSearchApplicationParamSource) registry.register_param_source("search-application-search-param-source", SearchApplicationSearchParamSource) registry.register_param_source("create-query-ruleset-param-source", CreateQueryRulesetParamSource) registry.register_param_source("query-rules-search-param-source", QueryRulesSearchParamSource) registry.register_param_source("pinned-search-param-source", PinnedSearchParamSource) registry.register_param_source("retriever-search", RetrieverParamSource) registry.register_param_source("esql-search", EsqlSearchParamSource)