def execute()

in azure/Kqlmagic/draft_client.py [0:0]


    def execute(self, id:str, query:str, accept_partial_results:bool=False, **options)->Union[KqlQueryResponse, KqlSchemaResponse]:
        """ Execute a simple query or a metadata query
        
        Parameters
        ----------
        id : str
            the workspaces (log analytics) or appid (application insights).
        query : str
            Query to be executed
        accept_partial_results : bool, optional
            Optional parameter. If query fails, but we receive some results, we consider results as partial.
            If this is True, results are returned to client, even if there are exceptions.
            If this is False, exception is raised. Default is False.
        oprions["timeout"] : float, optional
            Optional parameter. Network timeout in seconds. Default is no timeout.

        Returns
        -------
        object
            KqlQueryResponse instnace if executed simple query request
            KqlSchemaResponse instnace if executed metadata request

        Raises
        ------
        KqlError
            If request to draft failed.
            If response from draft contains exceptions.
        """

        #
        # create API url
        #

        is_metadata = query == self._GET_SCHEMA_QUERY
        api_url = f"{self._data_source}/{self._API_VERSION}/{self._domain}/{id}/{'metadata' if is_metadata else 'query'}"

        #
        # create Prefer header
        #

        prefer_list = []
        if self._API_VERSION != "beta":
            prefer_list.append("ai.response-thinning=false")  # returns data as kusto v1
        
        timeout = options.get("timeout")
        if timeout is not None:
            prefer_list.append(f"wait={timeout}")

        #
        # create headers
        #
        
        client_version = f"{Constants.MAGIC_CLASS_NAME}.Python.Client:{self._WEB_CLIENT_VERSION}"        

        client_request_id = f"{Constants.MAGIC_CLASS_NAME}.execute"
        client_request_id_tag = options.get("request_id_tag")
        if client_request_id_tag is not None:
            client_request_id = f"{client_request_id};{client_request_id_tag};{str(uuid.uuid4())}/{self._session_guid}/AzureMonitor"
        else:
            client_request_id = f"{client_request_id};{str(uuid.uuid4())}/{self._session_guid}/AzureMonitor"

        app = f'{Constants.MAGIC_CLASS_NAME};{options.get("notebook_app")}'
        app_tag = options.get("request_app_tag")
        if app_tag is not None:
            app = f"{app};{app_tag}"

        request_headers = {
            "x-ms-client-version": client_version,
            "x-ms-client-request-id": client_request_id,
            "x-ms-app": app
        }
        user_tag = options.get("request_user_tag")
        if user_tag is not None:
            request_headers["x-ms-user"] = user_tag

        if self._aad_helper is not None:
            request_headers["Authorization"] = self._aad_helper.acquire_token()
        elif self._appkey is not None:
            request_headers["x-api-key"] = self._appkey

        if len(prefer_list) > 0:
            request_headers["Prefer"] = ", ".join(prefer_list)

        cache_max_age = options.get("request_cache_max_age")
        if cache_max_age is not None:
            if cache_max_age > 0:
                request_headers["Cache-Control"] = f"max-age={cache_max_age}"
            else:
                request_headers["Cache-Control"] = "no-cache"

        #
        # submit request
        #
        log_request_headers = request_headers
        if request_headers.get("Authorization"):
            log_request_headers = request_headers.copy()
            log_request_headers["Authorization"] = "..." 

        # collect this inormation, in case bug report will be generated
        KqlClient.last_query_info = {
            "request": {
                "endpoint": api_url,
                "headers": log_request_headers,
                "timeout": options.get("timeout"),
            }
        }
        requests = Dependencies.get_module("requests")
        if is_metadata:
            logger().debug(f"DraftClient::execute - GET request - url: {api_url}, headers: {log_request_headers}, timeout: {options.get('timeout')}")
            response = requests.get(api_url, headers=request_headers, timeout=options.get("timeout"))
        else:
            request_payload = {
                "query": query
            }

            # Implicit Cross Workspace Queries: https://dev.loganalytics.io/oms/documentation/3-Using-the-API/CrossResourceQuery
            # workspaces - string[] - A list of workspaces that are included in the query.
            if type(options.get("query_properties")) == dict:
                resources = options.get("query_properties").get(self.resources_name)
                if type(resources) == list and len(resources) > 0:
                    request_payload[self.resources_name] = resources

                timespan = options.get("query_properties").get("timespan")
                if type(timespan) == str and len(timespan) > 0:
                    request_payload["timespan"] = timespan

            logger().debug(f"DraftClient::execute - POST request - url: {api_url}, headers: {log_request_headers}, payload: {request_payload}, timeout: {options.get('timeout')}")

            # collect this inormation, in case bug report will be generated
            self.last_query_info["request"]["payload"] = request_payload  # pylint: disable=unsupported-assignment-operation, unsubscriptable-object

            response = requests.post(api_url, headers=request_headers, json=request_payload, timeout=options.get("timeout"))

        logger().debug(f"DraftClient::execute - response - status: {response.status_code}, headers: {response.headers}, payload: {response.text}")
        #
        # handle response
        #
        # collect this inormation, in case bug report will be generated
        self.last_query_info["response"] = {  # pylint: disable=unsupported-assignment-operation
            "status_code": response.status_code
        }

        if response.status_code != requests.codes.ok:  # pylint: disable=E1101
            try:
                parsed_error = json.loads(response.text)
            except:
                parsed_error = response.text
            # collect this inormation, in case bug report will be generated
            self.last_query_info["response"]["error"] = parsed_error  # pylint: disable=unsupported-assignment-operation, unsubscriptable-object
            raise KqlError(response.text, response)

        json_response = response.json()

        if is_metadata:
            kql_response = KqlSchemaResponse(json_response)
        else:
            kql_response = KqlQueryResponse(json_response)

        if kql_response.has_exceptions() and not accept_partial_results:
            try:
                error_message = json_dumps(kql_response.get_exceptions())
            except:
                error_message = str(kql_response.get_exceptions())
            raise KqlError(error_message, response, kql_response)

        return kql_response