def connect()

in geneve/stack/prober_elastic.py [0:0]


    def connect(self):
        es_args = shell_expand(self.es_args)
        kb_args = shell_expand(self.kb_args) or {}
        basic_auth = None

        # drop empty vars
        for var in ("api_key", "ca_certs", "verify_certs"):
            if not es_args.get(var):
                es_args.pop(var, None)
            if not kb_args.get(var):
                kb_args.pop(var, None)

        if es_args.get("basic_auth") == ["", ""]:
            es_args.pop("basic_auth", None)
        if kb_args.get("basic_auth") == ["", ""]:
            kb_args.pop("basic_auth", None)

        if "verify_certs" in es_args:
            es_args["verify_certs"] = str_to_bool(es_args["verify_certs"])
        if "verify_certs" in kb_args:
            kb_args["verify_certs"] = str_to_bool(kb_args["verify_certs"])

        try:
            es = Elasticsearch(**es_args)
            es.info()
        except AuthenticationException:
            es = None
            if "basic_auth" not in es_args:
                for filename in (f for ext in _credentials_readers for f in Path(".").glob(f"credentials-*.{ext}")):
                    basic_auth = _read_credentials(filename)
                    try:
                        es = Elasticsearch(**es_args, basic_auth=basic_auth)
                        es.info()
                        break
                    except AuthenticationException:
                        es = None
            if not es:
                raise ValueError("No valid credentials found")

        if not kb_args:
            kb_args["cloud_id"] = es_args.get("cloud_id")
            kb_args["verify_certs"] = es_args.get("verify_certs")
            kb_args["ca_certs"] = es_args.get("ca_certs")
            kb_args["basic_auth"] = basic_auth or es_args.get("basic_auth")

        while True:
            try:
                kb = Kibana(**kb_args)
                kb.status()
                break
            except Kibana.exceptions.SSLError:
                if kb_args.get("ca_certs") == es_args.get("ca_certs"):
                    raise
                kb_args["ca_certs"] = es_args.get("ca_certs")
            except Kibana.exceptions.HTTPError as e:
                if e.response.status_code != 401:  # Unauthorized
                    raise
                if kb_args.get("basic_auth") == (basic_auth or es_args.get("basic_auth")):
                    raise
                kb_args["basic_auth"] = basic_auth or es_args.get("basic_auth")

        self.es = es
        self.kb = kb