def execute()

in azure/Kqlmagic/kusto_client.py [0:0]


    def execute(self, kusto_database:str, kusto_query:str, accept_partial_results:bool=False, **options)->KqlQueryResponse:
        """ 
        Execute a simple query or management command

        Parameters
        ----------
        kusto_database : str
            Database against query will be executed.
        query : str
            Query to be executed
        accept_partial_results : bool
            Optional parameter. If query fails, but we receive some results, we consider results as partial.
            If this is True, results are returned to client, even if there are exceptions.
            If this is False, exception is raised. Default is False.
        options["timeout"] : float, optional
            Optional parameter. Network timeout in seconds. Default is no timeout.
        """
        if kusto_query.startswith("."):
            endpoint_version = self._MGMT_ENDPOINT_VERSION
            endpoint = self._mgmt_endpoint  
        else:
            endpoint_version = self._QUERY_ENDPOINT_VERSION
            endpoint = self._query_endpoint

        # print("### db: ", kusto_database, " ###")
        # print("### csl: ", kusto_query, " ###")
        # kusto_database = kusto_database.replace(" ", "")
        # print("### db: ", kusto_database, " ###")

        request_payload = {
            "db": kusto_database, 
            "csl": kusto_query,
        }

        client_version = f"{Constants.MAGIC_CLASS_NAME}.Python.Client:{self._WEB_CLIENT_VERSION}"

        client_request_id = f"{Constants.MAGIC_CLASS_NAME}.execute"
        client_request_id_tag = options.get("request_id_tag")
        if client_request_id_tag is not None:
            client_request_id = f"{client_request_id};{client_request_id_tag};{str(uuid.uuid4())}/{self._session_guid}/AzureDataExplorer"
        else:
            client_request_id = f"{client_request_id};{str(uuid.uuid4())}/{self._session_guid}/AzureDataExplorer"
            
        app = f'{Constants.MAGIC_CLASS_NAME};{options.get("notebook_app")}'
        app_tag = options.get("request_app_tag")
        if app_tag is not None:
            app = f"{app};{app_tag}"

        query_properties:dict = options.get("query_properties")  or {}

        if type(kusto_query) == str:
            first_word = kusto_query.split(maxsplit=1)[0].upper()
            # ADX SQL mode
            if first_word in ["SELECT", "UPDATE", "CREATE", "DELETE", "EXPLAIN"]:
                # SQL to Kusto cheat sheet: https://docs.microsoft.com/en-us/azure/data-explorer/kusto/query/sqlcheatsheet
                # MS-TDS/T-SQL Differences between Kusto Microsoft SQL Server: https://docs.microsoft.com/en-us/azure/data-explorer/kusto/api/tds/sqlknownissues
                query_properties["query_language"] = "sql"

        cache_max_age = options.get("request_cache_max_age")
        if cache_max_age is not None and cache_max_age > 0:
            query_properties["query_results_cache_max_age"] = query_properties.get("query_results_cache_max_age")\
                                                              or f"{cache_max_age}s"

        if len(query_properties) > 0:
            properties = {
                "Options": query_properties,
                "Parameters": {},
                "ClientRequestId": client_request_id
            }
            request_payload["properties"] = json_dumps(properties)

        request_headers = {
            "Accept": "application/json",
            "Accept-Encoding": "gzip,deflate",
            "Content-Type": "application/json; charset=utf-8",
            "x-ms-client-version": client_version,
            "x-ms-client-request-id": client_request_id,
            "x-ms-app": app
        }
        user_tag = options.get("request_user_tag")
        if user_tag is not None:
            request_headers["x-ms-user"] = user_tag
        if self._aad_helper is not None:
            request_headers["Authorization"] = self._aad_helper.acquire_token()
            request_headers["Fed"] = "True"

        cache_max_age = options.get("request_cache_max_age")
        if cache_max_age is not None:
            if cache_max_age > 0:
                request_headers["Cache-Control"] = f"max-age={cache_max_age}"
            else:
                request_headers["Cache-Control"] = "no-cache"
            
        # print("endpoint: ", endpoint)
        # print("headers: ", request_headers)
        # print("payload: ", request_payload)
        # print("timeout: ", options.get("timeout"))

        log_request_headers = request_headers
        if request_headers.get("Authorization"):
            log_request_headers = request_headers.copy()
            log_request_headers["Authorization"] = "..."  

        logger().debug(f"KustoClient::execute - POST request - url: {endpoint}, headers: {log_request_headers}, payload: {request_payload}, timeout: {options.get('timeout')}")

        # collect this information, in case bug report will be generated
        KqlClient.last_query_info = {
            "request": {
                "endpoint": endpoint,
                "headers": log_request_headers,
                "payload": request_payload,
                "timeout": options.get("timeout"),
            }
        }
        requests = Dependencies.get_module("requests")
        response = requests.post(endpoint, headers=request_headers, json=request_payload, timeout=options.get("timeout"))

        logger().debug(f"KustoClient::execute - response - status: {response.status_code}, headers: {response.headers}, payload: {response.text}")

        # print("response status code: ", response.status_code)
        # print("response", response)
        # print("response text", response.text)

        # collect this information, in case bug report will be generated
        self.last_query_info["response"] = {  # pylint: disable=unsupported-assignment-operation
            "status_code": response.status_code
        }

        if response.status_code != requests.codes.ok:  # pylint: disable=E1101
            try:
                parsed_error = json.loads(response.text)
            except:
                parsed_error = response.text
            # collect this information, in case bug report will be generated
            self.last_query_info["response"]["error"] = parsed_error  # pylint: disable=unsupported-assignment-operation, unsubscriptable-object
            raise KqlError(response.text, response)

        kql_response = KqlQueryResponse(response.json(), endpoint_version)

        if kql_response.has_exceptions() and not accept_partial_results:
            try:
                error_message = json_dumps(kql_response.get_exceptions())
            except:
                error_message = str(kql_response.get_exceptions())
            raise KqlError(error_message, response, kql_response)

        return kql_response