in azure/Kqlmagic/kusto_client.py [0:0]
def execute(self, kusto_database:str, kusto_query:str, accept_partial_results:bool=False, **options)->KqlQueryResponse:
"""
Execute a simple query or management command
Parameters
----------
kusto_database : str
Database against query will be executed.
query : str
Query to be executed
accept_partial_results : bool
Optional parameter. If query fails, but we receive some results, we consider results as partial.
If this is True, results are returned to client, even if there are exceptions.
If this is False, exception is raised. Default is False.
options["timeout"] : float, optional
Optional parameter. Network timeout in seconds. Default is no timeout.
"""
if kusto_query.startswith("."):
endpoint_version = self._MGMT_ENDPOINT_VERSION
endpoint = self._mgmt_endpoint
else:
endpoint_version = self._QUERY_ENDPOINT_VERSION
endpoint = self._query_endpoint
# print("### db: ", kusto_database, " ###")
# print("### csl: ", kusto_query, " ###")
# kusto_database = kusto_database.replace(" ", "")
# print("### db: ", kusto_database, " ###")
request_payload = {
"db": kusto_database,
"csl": kusto_query,
}
client_version = f"{Constants.MAGIC_CLASS_NAME}.Python.Client:{self._WEB_CLIENT_VERSION}"
client_request_id = f"{Constants.MAGIC_CLASS_NAME}.execute"
client_request_id_tag = options.get("request_id_tag")
if client_request_id_tag is not None:
client_request_id = f"{client_request_id};{client_request_id_tag};{str(uuid.uuid4())}/{self._session_guid}/AzureDataExplorer"
else:
client_request_id = f"{client_request_id};{str(uuid.uuid4())}/{self._session_guid}/AzureDataExplorer"
app = f'{Constants.MAGIC_CLASS_NAME};{options.get("notebook_app")}'
app_tag = options.get("request_app_tag")
if app_tag is not None:
app = f"{app};{app_tag}"
query_properties:dict = options.get("query_properties") or {}
if type(kusto_query) == str:
first_word = kusto_query.split(maxsplit=1)[0].upper()
# ADX SQL mode
if first_word in ["SELECT", "UPDATE", "CREATE", "DELETE", "EXPLAIN"]:
# SQL to Kusto cheat sheet: https://docs.microsoft.com/en-us/azure/data-explorer/kusto/query/sqlcheatsheet
# MS-TDS/T-SQL Differences between Kusto Microsoft SQL Server: https://docs.microsoft.com/en-us/azure/data-explorer/kusto/api/tds/sqlknownissues
query_properties["query_language"] = "sql"
cache_max_age = options.get("request_cache_max_age")
if cache_max_age is not None and cache_max_age > 0:
query_properties["query_results_cache_max_age"] = query_properties.get("query_results_cache_max_age")\
or f"{cache_max_age}s"
if len(query_properties) > 0:
properties = {
"Options": query_properties,
"Parameters": {},
"ClientRequestId": client_request_id
}
request_payload["properties"] = json_dumps(properties)
request_headers = {
"Accept": "application/json",
"Accept-Encoding": "gzip,deflate",
"Content-Type": "application/json; charset=utf-8",
"x-ms-client-version": client_version,
"x-ms-client-request-id": client_request_id,
"x-ms-app": app
}
user_tag = options.get("request_user_tag")
if user_tag is not None:
request_headers["x-ms-user"] = user_tag
if self._aad_helper is not None:
request_headers["Authorization"] = self._aad_helper.acquire_token()
request_headers["Fed"] = "True"
cache_max_age = options.get("request_cache_max_age")
if cache_max_age is not None:
if cache_max_age > 0:
request_headers["Cache-Control"] = f"max-age={cache_max_age}"
else:
request_headers["Cache-Control"] = "no-cache"
# print("endpoint: ", endpoint)
# print("headers: ", request_headers)
# print("payload: ", request_payload)
# print("timeout: ", options.get("timeout"))
log_request_headers = request_headers
if request_headers.get("Authorization"):
log_request_headers = request_headers.copy()
log_request_headers["Authorization"] = "..."
logger().debug(f"KustoClient::execute - POST request - url: {endpoint}, headers: {log_request_headers}, payload: {request_payload}, timeout: {options.get('timeout')}")
# collect this information, in case bug report will be generated
KqlClient.last_query_info = {
"request": {
"endpoint": endpoint,
"headers": log_request_headers,
"payload": request_payload,
"timeout": options.get("timeout"),
}
}
requests = Dependencies.get_module("requests")
response = requests.post(endpoint, headers=request_headers, json=request_payload, timeout=options.get("timeout"))
logger().debug(f"KustoClient::execute - response - status: {response.status_code}, headers: {response.headers}, payload: {response.text}")
# print("response status code: ", response.status_code)
# print("response", response)
# print("response text", response.text)
# collect this information, in case bug report will be generated
self.last_query_info["response"] = { # pylint: disable=unsupported-assignment-operation
"status_code": response.status_code
}
if response.status_code != requests.codes.ok: # pylint: disable=E1101
try:
parsed_error = json.loads(response.text)
except:
parsed_error = response.text
# collect this information, in case bug report will be generated
self.last_query_info["response"]["error"] = parsed_error # pylint: disable=unsupported-assignment-operation, unsubscriptable-object
raise KqlError(response.text, response)
kql_response = KqlQueryResponse(response.json(), endpoint_version)
if kql_response.has_exceptions() and not accept_partial_results:
try:
error_message = json_dumps(kql_response.get_exceptions())
except:
error_message = str(kql_response.get_exceptions())
raise KqlError(error_message, response, kql_response)
return kql_response