in geneve/stack/prober_elastic.py [0:0]
def connect(self):
es_args = shell_expand(self.es_args)
kb_args = shell_expand(self.kb_args) or {}
basic_auth = None
# drop empty vars
for var in ("api_key", "ca_certs", "verify_certs"):
if not es_args.get(var):
es_args.pop(var, None)
if not kb_args.get(var):
kb_args.pop(var, None)
if es_args.get("basic_auth") == ["", ""]:
es_args.pop("basic_auth", None)
if kb_args.get("basic_auth") == ["", ""]:
kb_args.pop("basic_auth", None)
if "verify_certs" in es_args:
es_args["verify_certs"] = str_to_bool(es_args["verify_certs"])
if "verify_certs" in kb_args:
kb_args["verify_certs"] = str_to_bool(kb_args["verify_certs"])
try:
es = Elasticsearch(**es_args)
es.info()
except AuthenticationException:
es = None
if "basic_auth" not in es_args:
for filename in (f for ext in _credentials_readers for f in Path(".").glob(f"credentials-*.{ext}")):
basic_auth = _read_credentials(filename)
try:
es = Elasticsearch(**es_args, basic_auth=basic_auth)
es.info()
break
except AuthenticationException:
es = None
if not es:
raise ValueError("No valid credentials found")
if not kb_args:
kb_args["cloud_id"] = es_args.get("cloud_id")
kb_args["verify_certs"] = es_args.get("verify_certs")
kb_args["ca_certs"] = es_args.get("ca_certs")
kb_args["basic_auth"] = basic_auth or es_args.get("basic_auth")
while True:
try:
kb = Kibana(**kb_args)
kb.status()
break
except Kibana.exceptions.SSLError:
if kb_args.get("ca_certs") == es_args.get("ca_certs"):
raise
kb_args["ca_certs"] = es_args.get("ca_certs")
except Kibana.exceptions.HTTPError as e:
if e.response.status_code != 401: # Unauthorized
raise
if kb_args.get("basic_auth") == (basic_auth or es_args.get("basic_auth")):
raise
kb_args["basic_auth"] = basic_auth or es_args.get("basic_auth")
self.es = es
self.kb = kb