esrally/utils/opts.py (170 lines of code) (raw):
# Licensed to Elasticsearch B.V. under one or more contributor
# license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright
# ownership. Elasticsearch B.V. licenses this file to you under
# the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
import difflib
import json
import re
from esrally.utils import io
# detect (very simplistically) json that starts with an empty array or array of strings
RE_JSON_ARRAY_START = re.compile(r'^(\s*\[\s*\])|(\s*\[\s*".*)')
def csv_to_list(csv):
if csv is None:
return None
if io.has_extension(csv, ".json"):
with open(io.normalize_path(csv), encoding="utf-8") as f:
content = f.read()
if not RE_JSON_ARRAY_START.match(content):
raise ValueError(f"csv args only support arrays in json but you supplied [{csv}]")
return json.loads(content)
elif RE_JSON_ARRAY_START.match(csv):
return json.loads(csv)
elif len(csv.strip()) == 0:
return []
else:
return [e.strip() for e in csv.split(",")]
def to_bool(v):
if v is None:
return None
elif v.lower() == "false":
return False
elif v.lower() == "true":
return True
else:
raise ValueError("Could not convert value '%s'" % v)
def to_none(v):
if v is None:
return None
elif v.lower() == "none":
return None
else:
raise ValueError("Could not convert value '%s'" % v)
def kv_to_map(kvs):
def convert(v):
# string (specified explicitly)
if v.startswith("'"):
return v[1:-1]
# int
try:
return int(v)
except ValueError:
pass
# float
try:
return float(v)
except ValueError:
pass
# boolean
try:
return to_bool(v)
except ValueError:
pass
try:
return to_none(v)
except ValueError:
pass
# treat it as string by default
return v
result = {}
for kv in kvs:
k, v = kv.split(":")
# key is always considered a string, value needs to be converted
result[k.strip()] = convert(v.strip())
return result
def to_dict(arg, default_parser=kv_to_map):
if io.has_extension(arg, ".json"):
with open(io.normalize_path(arg), encoding="utf-8") as f:
return json.load(f)
try:
return json.loads(arg)
except json.decoder.JSONDecodeError:
return default_parser(csv_to_list(arg))
def bulleted_list_of(src_list):
return [f"- {param}" for param in src_list]
def double_quoted_list_of(src_list):
return [f'"{param}"' for param in src_list]
def make_list_of_close_matches(word_list, all_possibilities):
"""
Returns list of closest matches for `word_list` from `all_possibilities`.
e.g. [num_of-shards] will return [num_of_shards] when all_possibilities=["num_of_shards", "num_of_replicas"]
:param word_list: A list of strings that we want to find closest matches for.
:param all_possibilities: List of strings that the algorithm will calculate the closest match from.
:return:
"""
close_matches = []
for param in word_list:
matched_word = difflib.get_close_matches(param, all_possibilities, n=1)
if matched_word:
close_matches.append(matched_word[0])
return close_matches
class ConnectOptions:
"""
Base Class to help either parsing --target-hosts or --client-options
"""
def __getitem__(self, key):
"""
Race expects the cfg object to be subscriptable
Just return 'default'
"""
return self.default
@property
def default(self):
"""Return a list with the options assigned to the 'default' key"""
return self.parsed_options["default"]
@property
def all_options(self):
"""Return a dict with all parsed options"""
return self.parsed_options
class TargetHosts(ConnectOptions):
DEFAULT = "default"
def __init__(self, argvalue):
self.argname = "--target-hosts"
self.argvalue = argvalue
self.parsed_options = []
self.parse_options()
@classmethod
def _normalize_hosts(cls, hosts):
# pylint: disable=import-outside-toplevel
from urllib.parse import unquote, urlparse
string_types = str, bytes
# if hosts are empty, just defer to defaults down the line
if hosts is None:
return [{}]
# passed in just one string
if isinstance(hosts, string_types):
hosts = [hosts]
out = []
# normalize hosts to dicts
for host in hosts:
if isinstance(host, string_types):
if "://" not in host:
host = "//%s" % host
parsed_url = urlparse(host)
h = {"host": parsed_url.hostname}
if parsed_url.port:
h["port"] = parsed_url.port
if parsed_url.scheme == "https":
h["port"] = parsed_url.port or 443
h["use_ssl"] = True
if parsed_url.username or parsed_url.password:
h["http_auth"] = "%s:%s" % (
unquote(parsed_url.username),
unquote(parsed_url.password),
)
if parsed_url.path and parsed_url.path != "/":
h["url_prefix"] = parsed_url.path
out.append(h)
else:
out.append(host)
return out
def parse_options(self):
def normalize_to_dict(arg):
"""
Return parsed comma separated host string as dict with "default" key.
This is needed to support backwards compatible --target-hosts for single clusters that are not
defined as a json string or file.
"""
return {TargetHosts.DEFAULT: self._normalize_hosts(arg)}
parsed_options = to_dict(self.argvalue, default_parser=normalize_to_dict)
p_opts_copy = parsed_options.copy()
for cluster_name, nodes in p_opts_copy.items():
parsed_options[cluster_name] = self._normalize_hosts(nodes)
self.parsed_options = parsed_options
@property
def all_hosts(self):
"""Return a dict with all parsed options"""
return self.all_options
class ClientOptions(ConnectOptions):
DEFAULT_CLIENT_OPTIONS = "timeout:60"
"""
Convert --client-options arg to a dict.
When no --client-options have been specified but multi-cluster --target-hosts are used,
apply options defaults for all cluster names.
"""
def __init__(self, argvalue, target_hosts=None):
self.argname = "--client-options"
self.argvalue = argvalue
self.target_hosts = target_hosts
self.parsed_options = []
self.parse_options()
def parse_options(self):
default_client_map = kv_to_map([ClientOptions.DEFAULT_CLIENT_OPTIONS])
if self.argvalue == ClientOptions.DEFAULT_CLIENT_OPTIONS and self.target_hosts is not None:
# --client-options unset but multi-clusters used in --target-hosts? apply options defaults for all cluster names.
self.parsed_options = {cluster_name: default_client_map for cluster_name in self.target_hosts.all_hosts.keys()}
else:
self.parsed_options = to_dict(self.argvalue, default_parser=ClientOptions.normalize_to_dict)
@staticmethod
def normalize_to_dict(arg):
"""
When --client-options is a non-json csv string (single cluster mode),
return parsed client options as dict with "default" key
This is needed to support single cluster use of --client-options when not
defined as a json string or file.
"""
default_client_map = kv_to_map([ClientOptions.DEFAULT_CLIENT_OPTIONS])
return {TargetHosts.DEFAULT: {**default_client_map, **kv_to_map(arg)}}
@property
def all_client_options(self):
"""Return a dict with all client options"""
return self.all_options
@property
def uses_static_responses(self):
return self.default.get("static_responses", False)
def with_max_connections(self, max_connections):
final_client_options = {}
for cluster, original_opts in self.all_client_options.items():
amended_opts = dict(original_opts)
amended_opts["max_connections"] = max(256, amended_opts.get("max_connections", max_connections))
final_client_options[cluster] = amended_opts
return final_client_options