esrally/utils/opts.py (170 lines of code) (raw):

# Licensed to Elasticsearch B.V. under one or more contributor # license agreements. See the NOTICE file distributed with # this work for additional information regarding copyright # ownership. Elasticsearch B.V. licenses this file to you under # the Apache License, Version 2.0 (the "License"); you may # not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, # software distributed under the License is distributed on an # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. import difflib import json import re from esrally.utils import io # detect (very simplistically) json that starts with an empty array or array of strings RE_JSON_ARRAY_START = re.compile(r'^(\s*\[\s*\])|(\s*\[\s*".*)') def csv_to_list(csv): if csv is None: return None if io.has_extension(csv, ".json"): with open(io.normalize_path(csv), encoding="utf-8") as f: content = f.read() if not RE_JSON_ARRAY_START.match(content): raise ValueError(f"csv args only support arrays in json but you supplied [{csv}]") return json.loads(content) elif RE_JSON_ARRAY_START.match(csv): return json.loads(csv) elif len(csv.strip()) == 0: return [] else: return [e.strip() for e in csv.split(",")] def to_bool(v): if v is None: return None elif v.lower() == "false": return False elif v.lower() == "true": return True else: raise ValueError("Could not convert value '%s'" % v) def to_none(v): if v is None: return None elif v.lower() == "none": return None else: raise ValueError("Could not convert value '%s'" % v) def kv_to_map(kvs): def convert(v): # string (specified explicitly) if v.startswith("'"): return v[1:-1] # int try: return int(v) except ValueError: pass # float try: return float(v) except ValueError: pass # boolean try: return to_bool(v) except ValueError: pass try: return to_none(v) except ValueError: pass # treat it as string by default return v result = {} for kv in kvs: k, v = kv.split(":") # key is always considered a string, value needs to be converted result[k.strip()] = convert(v.strip()) return result def to_dict(arg, default_parser=kv_to_map): if io.has_extension(arg, ".json"): with open(io.normalize_path(arg), encoding="utf-8") as f: return json.load(f) try: return json.loads(arg) except json.decoder.JSONDecodeError: return default_parser(csv_to_list(arg)) def bulleted_list_of(src_list): return [f"- {param}" for param in src_list] def double_quoted_list_of(src_list): return [f'"{param}"' for param in src_list] def make_list_of_close_matches(word_list, all_possibilities): """ Returns list of closest matches for `word_list` from `all_possibilities`. e.g. [num_of-shards] will return [num_of_shards] when all_possibilities=["num_of_shards", "num_of_replicas"] :param word_list: A list of strings that we want to find closest matches for. :param all_possibilities: List of strings that the algorithm will calculate the closest match from. :return: """ close_matches = [] for param in word_list: matched_word = difflib.get_close_matches(param, all_possibilities, n=1) if matched_word: close_matches.append(matched_word[0]) return close_matches class ConnectOptions: """ Base Class to help either parsing --target-hosts or --client-options """ def __getitem__(self, key): """ Race expects the cfg object to be subscriptable Just return 'default' """ return self.default @property def default(self): """Return a list with the options assigned to the 'default' key""" return self.parsed_options["default"] @property def all_options(self): """Return a dict with all parsed options""" return self.parsed_options class TargetHosts(ConnectOptions): DEFAULT = "default" def __init__(self, argvalue): self.argname = "--target-hosts" self.argvalue = argvalue self.parsed_options = [] self.parse_options() @classmethod def _normalize_hosts(cls, hosts): # pylint: disable=import-outside-toplevel from urllib.parse import unquote, urlparse string_types = str, bytes # if hosts are empty, just defer to defaults down the line if hosts is None: return [{}] # passed in just one string if isinstance(hosts, string_types): hosts = [hosts] out = [] # normalize hosts to dicts for host in hosts: if isinstance(host, string_types): if "://" not in host: host = "//%s" % host parsed_url = urlparse(host) h = {"host": parsed_url.hostname} if parsed_url.port: h["port"] = parsed_url.port if parsed_url.scheme == "https": h["port"] = parsed_url.port or 443 h["use_ssl"] = True if parsed_url.username or parsed_url.password: h["http_auth"] = "%s:%s" % ( unquote(parsed_url.username), unquote(parsed_url.password), ) if parsed_url.path and parsed_url.path != "/": h["url_prefix"] = parsed_url.path out.append(h) else: out.append(host) return out def parse_options(self): def normalize_to_dict(arg): """ Return parsed comma separated host string as dict with "default" key. This is needed to support backwards compatible --target-hosts for single clusters that are not defined as a json string or file. """ return {TargetHosts.DEFAULT: self._normalize_hosts(arg)} parsed_options = to_dict(self.argvalue, default_parser=normalize_to_dict) p_opts_copy = parsed_options.copy() for cluster_name, nodes in p_opts_copy.items(): parsed_options[cluster_name] = self._normalize_hosts(nodes) self.parsed_options = parsed_options @property def all_hosts(self): """Return a dict with all parsed options""" return self.all_options class ClientOptions(ConnectOptions): DEFAULT_CLIENT_OPTIONS = "timeout:60" """ Convert --client-options arg to a dict. When no --client-options have been specified but multi-cluster --target-hosts are used, apply options defaults for all cluster names. """ def __init__(self, argvalue, target_hosts=None): self.argname = "--client-options" self.argvalue = argvalue self.target_hosts = target_hosts self.parsed_options = [] self.parse_options() def parse_options(self): default_client_map = kv_to_map([ClientOptions.DEFAULT_CLIENT_OPTIONS]) if self.argvalue == ClientOptions.DEFAULT_CLIENT_OPTIONS and self.target_hosts is not None: # --client-options unset but multi-clusters used in --target-hosts? apply options defaults for all cluster names. self.parsed_options = {cluster_name: default_client_map for cluster_name in self.target_hosts.all_hosts.keys()} else: self.parsed_options = to_dict(self.argvalue, default_parser=ClientOptions.normalize_to_dict) @staticmethod def normalize_to_dict(arg): """ When --client-options is a non-json csv string (single cluster mode), return parsed client options as dict with "default" key This is needed to support single cluster use of --client-options when not defined as a json string or file. """ default_client_map = kv_to_map([ClientOptions.DEFAULT_CLIENT_OPTIONS]) return {TargetHosts.DEFAULT: {**default_client_map, **kv_to_map(arg)}} @property def all_client_options(self): """Return a dict with all client options""" return self.all_options @property def uses_static_responses(self): return self.default.get("static_responses", False) def with_max_connections(self, max_connections): final_client_options = {} for cluster, original_opts in self.all_client_options.items(): amended_opts = dict(original_opts) amended_opts["max_connections"] = max(256, amended_opts.get("max_connections", max_connections)) final_client_options[cluster] = amended_opts return final_client_options