def get_cloud_info_for_cluster()

in azure-kusto-data/azure/kusto/data/_cloud_settings.py [0:0]


    def get_cloud_info_for_cluster(cls, kusto_uri: str, proxies: Optional[Dict[str, str]] = None) -> CloudInfo:
        kusto_uri = cls._normalize_uri(kusto_uri)

        # tracing attributes for cloud info
        Span.set_cloud_info_attributes(kusto_uri)

        if kusto_uri in cls._cloud_cache:  # Double-checked locking to avoid unnecessary lock access
            return cls._cloud_cache[kusto_uri]

        with cls._cloud_cache_lock:
            if kusto_uri in cls._cloud_cache:
                return cls._cloud_cache[kusto_uri]

            url_parts = urlparse(kusto_uri)
            url = f"{url_parts.scheme}://{url_parts.netloc}/{METADATA_ENDPOINT}"

            try:
                # trace http get call for result
                result = MonitoredActivity.invoke(
                    lambda: requests.get(url, proxies=proxies, allow_redirects=False),
                    name_of_span="CloudSettings.http_get",
                    tracing_attributes=Span.create_http_attributes(url=url, method="GET"),
                )
            except Exception as e:
                raise KustoNetworkError(url) from e

            if result.status_code == 200:
                content = result.json()
                if content is None or content == {}:
                    raise KustoServiceError("Kusto returned an invalid cloud metadata response", result)
                root = content["AzureAD"]
                if root is not None:
                    cls._cloud_cache[kusto_uri] = CloudInfo(
                        login_endpoint=root["LoginEndpoint"],
                        login_mfa_required=root["LoginMfaRequired"],
                        kusto_client_app_id=root["KustoClientAppId"],
                        kusto_client_redirect_uri=root["KustoClientRedirectUri"],
                        kusto_service_resource_id=root["KustoServiceResourceId"],
                        first_party_authority_url=root["FirstPartyAuthorityUrl"],
                    )
                else:
                    cls._cloud_cache[kusto_uri] = cls.DEFAULT_CLOUD
            elif result.status_code == 404:
                # For now as long not all proxies implement the metadata endpoint, if no endpoint exists return public cloud data
                cls._cloud_cache[kusto_uri] = cls.DEFAULT_CLOUD
            else:
                raise KustoServiceError("Kusto returned an invalid cloud metadata response", result)
            return cls._cloud_cache[kusto_uri]