in elastic_enterprise_search/_utils.py [0:0]
def client_node_configs(hosts, **kwargs) -> t.List[NodeConfig]:
if hosts is None or hosts is DEFAULT:
hosts = ["http://localhost:3002"]
if not isinstance(hosts, (list, tuple)):
hosts = [hosts]
node_configs = []
for host in hosts:
if isinstance(host, str):
node_configs.append(
url_to_node_config(host, use_default_ports_for_scheme=True)
)
else:
raise TypeError("URLs must be of type 'str'")
# Remove all values which are 'DEFAULT' to avoid overwriting actual defaults.
node_options = {k: v for k, v in kwargs.items() if v is not DEFAULT}
# Set the 'User-Agent' default header.
headers = HttpHeaders(node_options.pop("headers", ()))
headers.setdefault("user-agent", USER_AGENT)
node_options["headers"] = headers
def apply_node_options(node_config: NodeConfig) -> NodeConfig:
"""Needs special handling of headers since .replace() wipes out existing headers"""
nonlocal node_options
headers = node_config.headers.copy() # type: ignore[attr-defined]
headers_to_add = node_options.pop("headers", ())
if headers_to_add:
headers.update(headers_to_add)
headers.setdefault("user-agent", USER_AGENT)
headers.freeze()
node_options["headers"] = headers
return node_config.replace(**node_options)
return [apply_node_options(node_config) for node_config in node_configs]