azure-kusto-data/azure/kusto/data/kcsb.py (396 lines of code) (raw):
import dataclasses
from enum import unique, Enum
from typing import Union, Callable, Coroutine, Optional, Tuple, List, Any, ClassVar
from urllib.parse import urlparse
from ._string_utils import assert_string_is_not_empty
from ._token_providers import DeviceCallbackType
from .client_details import ClientDetails
from .helpers import load_bundled_json
UNSUPPORTED_KEYWORD = "UNSUPPORTED"
@unique
class SupportedKeywords(Enum):
DATA_SOURCE = "Data Source"
INITIAL_CATALOG = "Initial Catalog"
FEDERATED_SECURITY = "AAD Federated Security"
APPLICATION_CLIENT_ID = "Application Client Id"
APPLICATION_KEY = "Application Key"
USER_ID = "User ID"
PASSWORD = "Password"
AUTHORITY_ID = "Authority Id"
APPLICATION_TOKEN = "Application Token"
USER_TOKEN = "User Token"
APPLICATION_CERTIFICATE_BLOB = "Application Certificate Blob"
APPLICATION_CERTIFICATE_X5C = "Application Certificate SendX5c"
APPLICATION_CERTIFICATE_THUMBPRINT = "Application Certificate Thumbprint"
TRACE_APP_NAME = "Application Name for Tracing"
TRACE_USER_NAME = "User Name for Tracing"
@unique
class UnsupportedKeywords(Enum):
DSTS_FEDERATED_SECURITY = "dSTS Federated Security"
STREAMING = "Streaming"
UNCOMPRESSED = "Uncompressed"
ENFORCE_MFA = "EnforceMfa"
ACCEPT = "Accept"
QUERY_CONSISTENCY = "Query Consistency"
DATA_SOURCE_URI = "Data Source Uri"
AZURE_REGION = "Azure Region"
NAMESPACE = "Namespace"
APPLICATION_CERTIFICATE_ISSUER_DISTINGUISHED_NAME = "Application Certificate Issuer Distinguished Name"
APPLICATION_CERTIFICATE_SUBJECT_DISTINGUISHED_NAME = "Application Certificate Subject Distinguished Name"
@dataclasses.dataclass(frozen=True)
class Keyword:
_supported_keywords: ClassVar[List[str]] = [k.value for k in SupportedKeywords]
_unsupported_keywords: ClassVar[List[str]] = [k.value for k in UnsupportedKeywords]
_lookup: ClassVar[dict]
name: SupportedKeywords
type: str
secret: bool
def is_str_type(self) -> bool:
return self.type == "string"
def is_bool_type(self) -> bool:
return self.type == "bool"
@staticmethod
def normalize_string(key: str) -> str:
return key.lower().replace(" ", "")
@classmethod
def init_lookup(cls):
kcsb_json: dict = load_bundled_json("kcsb.json")
lookup = {}
for v in kcsb_json["keywords"]:
name = v["name"]
if name in cls._supported_keywords:
keyword = Keyword(SupportedKeywords(name), v["type"], v["secret"])
elif name in cls._unsupported_keywords:
keyword = UNSUPPORTED_KEYWORD
else:
raise KeyError(f"Unknown keyword: `{name}`")
lookup[Keyword.normalize_string(name)] = keyword
for alias in v["aliases"]:
lookup[Keyword.normalize_string(alias)] = keyword
cls._lookup = lookup
@classmethod
def parse(cls, key: Union[str, SupportedKeywords]) -> "Keyword":
if isinstance(key, SupportedKeywords):
key = key.value
normalized = Keyword.normalize_string(key)
if normalized not in cls._lookup:
raise KeyError(f"Unknown keyword: `{key}`")
if cls._lookup[normalized] == UNSUPPORTED_KEYWORD:
raise KeyError(f"Keyword `{key}` is not supported by this SDK")
return cls._lookup[normalized]
@classmethod
def lookup(cls, key: Union[str, SupportedKeywords]) -> "Keyword":
if isinstance(key, SupportedKeywords):
key = key.value
return cls._lookup[Keyword.normalize_string(key)]
Keyword.init_lookup()
class KustoConnectionStringBuilder:
"""
Parses Kusto connection strings.
For usages, check out the sample at:
https://github.com/Azure/azure-kusto-python/blob/master/azure-kusto-data/tests/sample.py
"""
DEFAULT_DATABASE_NAME = "NetDefaultDB"
interactive_login: bool = False
az_cli_login: bool = False
device_login: bool = False
token_credential_login: bool = False
device_callback: DeviceCallbackType = None
msi_authentication: bool = False
msi_parameters: Optional[dict] = None
token_provider: Optional[Callable[[], str]] = None
async_token_provider: Optional[Callable[[], Coroutine[None, None, str]]] = None
application_for_tracing: Optional[str] = None
user_name_for_tracing: Optional[str] = None
azure_credential: Optional[Any] = None
azure_credential_from_login_endpoint: Optional[Any] = None
application_public_certificate: Optional[str] = None
def __init__(self, connection_string: str):
"""
Creates new KustoConnectionStringBuilder.
:param str connection_string: Kusto connection string should be of the format:
https://<clusterName>.kusto.windows.net;AAD User ID="user@microsoft.com";Password=P@ssWord
For more information please look at:
https://kusto.azurewebsites.net/docs/concepts/kusto_connection_strings.html
"""
assert_string_is_not_empty(connection_string)
self._internal_dict = {}
if connection_string is not None and "=" not in connection_string.partition(";")[0]:
connection_string = "Data Source=" + connection_string
self[SupportedKeywords.AUTHORITY_ID] = "organizations"
for kvp_string in connection_string.split(";"):
key, _, value = kvp_string.partition("=")
keyword = Keyword.parse(key)
value_stripped = value.strip()
if keyword.is_str_type():
if keyword.name == SupportedKeywords.DATA_SOURCE:
self[keyword.name] = value_stripped.rstrip("/")
self._parse_data_source(self.data_source)
elif keyword.name == SupportedKeywords.TRACE_USER_NAME:
self.user_name_for_tracing = value_stripped
elif keyword.name == SupportedKeywords.TRACE_APP_NAME:
self.application_for_tracing = value_stripped
else:
self[keyword.name] = value_stripped
elif keyword.is_bool_type():
if value_stripped in ["True", "true"]:
self[keyword.name] = True
elif value_stripped in ["False", "false"]:
self[keyword.name] = False
else:
raise KeyError("Expected aad federated security to be bool. Recieved %s" % value)
if self.initial_catalog is None:
self.initial_catalog = self.DEFAULT_DATABASE_NAME
def __setitem__(self, key: "Union[SupportedKeywords, str]", value: Union[str, bool, dict]):
keyword = Keyword.parse(key)
if value is None:
raise TypeError("Value cannot be None.")
if keyword.is_str_type():
self._internal_dict[keyword.name] = value.strip()
elif keyword.is_bool_type():
if not isinstance(value, bool):
raise TypeError("Expected %s to be bool" % key)
self._internal_dict[keyword.name] = value
else:
raise KeyError("KustoConnectionStringBuilder supports only bools and strings.")
@classmethod
def with_aad_user_password_authentication(
cls, connection_string: str, user_id: str, password: str, authority_id: str = "organizations"
) -> "KustoConnectionStringBuilder":
"""
Creates a KustoConnection string builder that will authenticate with AAD user name and
password.
:param str connection_string: Kusto connection string should be of the format: https://<clusterName>.kusto.windows.net
:param str user_id: AAD user ID.
:param str password: Corresponding password of the AAD user.
:param str authority_id: optional param. defaults to "organizations"
"""
assert_string_is_not_empty(user_id)
assert_string_is_not_empty(password)
kcsb = cls(connection_string)
kcsb[SupportedKeywords.FEDERATED_SECURITY] = True
kcsb[SupportedKeywords.USER_ID] = user_id
kcsb[SupportedKeywords.AUTHORITY_ID] = authority_id
kcsb[SupportedKeywords.PASSWORD] = password
return kcsb
@classmethod
def with_aad_user_token_authentication(cls, connection_string: str, user_token: str) -> "KustoConnectionStringBuilder":
"""
Creates a KustoConnection string builder that will authenticate with AAD application and
a certificate credentials.
:param str connection_string: Kusto connection string should be of the format:
https://<clusterName>.kusto.windows.net
:param str user_token: AAD user token.
"""
assert_string_is_not_empty(user_token)
kcsb = cls(connection_string)
kcsb[SupportedKeywords.FEDERATED_SECURITY] = True
kcsb[SupportedKeywords.USER_TOKEN] = user_token
return kcsb
@classmethod
def with_aad_application_key_authentication(
cls, connection_string: str, aad_app_id: str, app_key: str, authority_id: str
) -> "KustoConnectionStringBuilder":
"""
Creates a KustoConnection string builder that will authenticate with AAD application and key.
:param str connection_string: Kusto connection string should be of the format: https://<clusterName>.kusto.windows.net
:param str aad_app_id: AAD application ID.
:param str app_key: Corresponding key of the AAD application.
:param str authority_id: Authority id (aka Tenant id) must be provided
"""
assert_string_is_not_empty(aad_app_id)
assert_string_is_not_empty(app_key)
assert_string_is_not_empty(authority_id)
kcsb = cls(connection_string)
kcsb[SupportedKeywords.FEDERATED_SECURITY] = True
kcsb[SupportedKeywords.APPLICATION_CLIENT_ID] = aad_app_id
kcsb[SupportedKeywords.APPLICATION_KEY] = app_key
kcsb[SupportedKeywords.AUTHORITY_ID] = authority_id
return kcsb
@classmethod
def with_aad_application_certificate_authentication(
cls, connection_string: str, aad_app_id: str, certificate: str, thumbprint: str, authority_id: str
) -> "KustoConnectionStringBuilder":
"""
Creates a KustoConnection string builder that will authenticate with AAD application using
a certificate.
:param str connection_string: Kusto connection string should be of the format:
https://<clusterName>.kusto.windows.net
:param str aad_app_id: AAD application ID.
:param str certificate: A PEM encoded certificate private key.
:param str thumbprint: hex encoded thumbprint of the certificate.
:param str authority_id: Authority id (aka Tenant id) must be provided
"""
assert_string_is_not_empty(aad_app_id)
assert_string_is_not_empty(certificate)
assert_string_is_not_empty(thumbprint)
assert_string_is_not_empty(authority_id)
kcsb = cls(connection_string)
kcsb[SupportedKeywords.FEDERATED_SECURITY] = True
kcsb[SupportedKeywords.APPLICATION_CLIENT_ID] = aad_app_id
kcsb[SupportedKeywords.APPLICATION_CERTIFICATE_BLOB] = certificate
kcsb[SupportedKeywords.APPLICATION_CERTIFICATE_THUMBPRINT] = thumbprint
kcsb[SupportedKeywords.AUTHORITY_ID] = authority_id
return kcsb
@classmethod
def with_aad_application_certificate_sni_authentication(
cls, connection_string: str, aad_app_id: str, private_certificate: str, public_certificate: str, thumbprint: str, authority_id: str
) -> "KustoConnectionStringBuilder":
"""
Creates a KustoConnection string builder that will authenticate with AAD application using
a certificate Subject Name and Issuer.
:param str connection_string: Kusto connection string should be of the format:
https://<clusterName>.kusto.windows.net
:param str aad_app_id: AAD application ID.
:param str private_certificate: A PEM encoded certificate private key.
:param str public_certificate: A public certificate matching the provided PEM certificate private key.
:param str thumbprint: hex encoded thumbprint of the certificate.
:param str authority_id: Authority id (aka Tenant id) must be provided
"""
assert_string_is_not_empty(aad_app_id)
assert_string_is_not_empty(private_certificate)
assert_string_is_not_empty(public_certificate)
assert_string_is_not_empty(thumbprint)
assert_string_is_not_empty(authority_id)
kcsb = cls(connection_string)
kcsb[SupportedKeywords.FEDERATED_SECURITY] = True
kcsb[SupportedKeywords.APPLICATION_CLIENT_ID] = aad_app_id
kcsb[SupportedKeywords.APPLICATION_CERTIFICATE_BLOB] = private_certificate
kcsb.application_public_certificate = public_certificate
kcsb[SupportedKeywords.APPLICATION_CERTIFICATE_THUMBPRINT] = thumbprint
kcsb[SupportedKeywords.AUTHORITY_ID] = authority_id
return kcsb
@classmethod
def with_aad_application_token_authentication(cls, connection_string: str, application_token: str) -> "KustoConnectionStringBuilder":
"""
Creates a KustoConnection string builder that will authenticate with AAD application and
an application token.
:param str connection_string: Kusto connection string should be of the format:
https://<clusterName>.kusto.windows.net
:param str application_token: AAD application token.
"""
assert_string_is_not_empty(application_token)
kcsb = cls(connection_string)
kcsb[SupportedKeywords.FEDERATED_SECURITY] = True
kcsb[SupportedKeywords.APPLICATION_TOKEN] = application_token
return kcsb
@classmethod
def with_aad_device_authentication(
cls, connection_string: str, authority_id: str = "organizations", callback: DeviceCallbackType = None
) -> "KustoConnectionStringBuilder":
"""
Creates a KustoConnection string builder that will authenticate with AAD application and
password.
:param str connection_string: Kusto connection string should be of the format: https://<clusterName>.kusto.windows.net
:param str authority_id: optional param. defaults to "organizations"
:param DeviceCallbackType callback: options callback function to be called when authentication is required, accepts three parameters:
- ``verification_uri`` (str) the URL the user must visit
- ``user_code`` (str) the code the user must enter there
- ``expires_on`` (datetime.datetime) the UTC time at which the code will expire
"""
kcsb = cls(connection_string)
kcsb.device_login = True
kcsb[SupportedKeywords.FEDERATED_SECURITY] = True
kcsb[SupportedKeywords.AUTHORITY_ID] = authority_id
kcsb.device_callback = callback
return kcsb
@classmethod
def with_az_cli_authentication(cls, connection_string: str) -> "KustoConnectionStringBuilder":
"""
Creates a KustoConnection string builder that will use existing authenticated az cli profile
password.
:param str connection_string: Kusto connection string should be of the format: https://<clusterName>.kusto.windows.net
"""
kcsb = cls(connection_string)
kcsb.az_cli_login = True
kcsb[SupportedKeywords.FEDERATED_SECURITY] = True
return kcsb
@classmethod
def with_aad_managed_service_identity_authentication(
cls, connection_string: str, client_id: str = None, object_id: str = None, msi_res_id: str = None, timeout: int = None
) -> "KustoConnectionStringBuilder":
"""
Creates a KustoConnection string builder that will authenticate with AAD application, using
an application token obtained from a Microsoft Service Identity endpoint. An optional user
assigned application ID can be added to the token.
:param str connection_string: Kusto connection string should be of the format: https://<clusterName>.kusto.windows.net
:param client_id: an optional user assigned identity provided as an Azure ID of a client
:param object_id: an optional user assigned identity provided as an Azure ID of an object
:param msi_res_id: an optional user assigned identity provided as an Azure ID of an MSI resource
:param timeout: an optional timeout (seconds) to wait for an MSI Authentication to occur
"""
kcsb = cls(connection_string)
params = {}
exclusive_pcount = 0
if timeout is not None:
params["connection_timeout"] = timeout
if client_id is not None:
params["client_id"] = client_id
exclusive_pcount += 1
if object_id is not None:
# Until we upgrade azure-identity to version 1.4.1, only client_id is excepted as a hint for user managed service identity
raise ValueError("User Managed Service Identity with object_id is temporarily not supported by azure identity 1.3.1. Please use client_id instead.")
# noinspection PyUnreachableCode
params["object_id"] = object_id
exclusive_pcount += 1
if msi_res_id is not None:
# Until we upgrade azure-identity to version 1.4.1, only client_id is excepted as a hint for user managed service identity
raise ValueError(
"User Managed Service Identity with msi_res_id is temporarily not supported by azure identity 1.3.1. Please use client_id instead."
)
# noinspection PyUnreachableCode
params["msi_res_id"] = msi_res_id
exclusive_pcount += 1
if exclusive_pcount > 1:
raise ValueError("the following parameters are mutually exclusive and can not be provided at the same time: client_uid, object_id, msi_res_id")
kcsb[SupportedKeywords.FEDERATED_SECURITY] = True
kcsb.msi_authentication = True
kcsb.msi_parameters = params
return kcsb
@classmethod
def with_token_provider(cls, connection_string: str, token_provider: Callable[[], str]) -> "KustoConnectionStringBuilder":
"""
Create a KustoConnectionStringBuilder that uses a callback function to obtain a connection token
:param str connection_string: Kusto connection string should be of the format: https://<clusterName>.kusto.windows.net
:param token_provider: a parameterless function that returns a valid bearer token for the relevant kusto resource as a string
"""
assert callable(token_provider)
kcsb = cls(connection_string)
kcsb[SupportedKeywords.FEDERATED_SECURITY] = True
kcsb.token_provider = token_provider
return kcsb
@classmethod
def with_async_token_provider(
cls,
connection_string: str,
async_token_provider: Callable[[], Coroutine[None, None, str]],
) -> "KustoConnectionStringBuilder":
"""
Create a KustoConnectionStringBuilder that uses an async callback function to obtain a connection token
:param str connection_string: Kusto connection string should be of the format: https://<clusterName>.kusto.windows.net
:param async_token_provider: a parameterless function that after awaiting returns a valid bearer token for the relevant kusto resource as a string
"""
assert callable(async_token_provider)
kcsb = cls(connection_string)
kcsb[SupportedKeywords.FEDERATED_SECURITY] = True
kcsb.async_token_provider = async_token_provider
return kcsb
@classmethod
def with_interactive_login(
cls, connection_string: str, user_id_hint: Optional[str] = None, tenant_hint: Optional[str] = None
) -> "KustoConnectionStringBuilder":
kcsb = cls(connection_string)
kcsb.interactive_login = True
kcsb[SupportedKeywords.FEDERATED_SECURITY] = True
if user_id_hint is not None:
kcsb[SupportedKeywords.USER_ID] = user_id_hint
if tenant_hint is not None:
kcsb[SupportedKeywords.AUTHORITY_ID] = tenant_hint
return kcsb
@classmethod
def with_azure_token_credential(
cls,
connection_string: str,
credential: Optional[Any] = None,
credential_from_login_endpoint: Optional[Callable[[str], Any]] = None,
) -> "KustoConnectionStringBuilder":
"""
Create a KustoConnectionStringBuilder that uses an azure token credential to obtain a connection token.
:param connection_string: Kusto connection string should be of the format: https://<clusterName>.kusto.windows.net
:param credential: an optional token credential to use for authentication
:param credential_from_login_endpoint: an optional function that returns a token credential for the relevant kusto resource
"""
kcsb = cls(connection_string)
kcsb[SupportedKeywords.FEDERATED_SECURITY] = True
kcsb.token_credential_login = True
kcsb.azure_credential = credential
kcsb.azure_credential_from_login_endpoint = credential_from_login_endpoint
return kcsb
@classmethod
def with_no_authentication(cls, connection_string: str) -> "KustoConnectionStringBuilder":
"""
Create a KustoConnectionStringBuilder that uses no authentication.
:param connection_string: Kusto's connection string should be of the format: http://<clusterName>.kusto.windows.net
"""
if not connection_string.startswith("http://"):
raise ValueError("Connection string must start with http://")
kcsb = cls(connection_string)
kcsb[SupportedKeywords.FEDERATED_SECURITY] = False
return kcsb
@property
def data_source(self) -> Optional[str]:
"""The URI specifying the Kusto service endpoint.
For example, https://kuskus.kusto.windows.net or net.tcp://localhost
"""
return self._internal_dict.get(SupportedKeywords.DATA_SOURCE)
@property
def initial_catalog(self) -> Optional[str]:
"""The default database to be used for requests.
By default, it is set to 'NetDefaultDB'.
"""
return self._internal_dict.get(SupportedKeywords.INITIAL_CATALOG)
@initial_catalog.setter
def initial_catalog(self, value: str) -> None:
self._internal_dict[SupportedKeywords.INITIAL_CATALOG] = value
@property
def aad_user_id(self) -> Optional[str]:
"""The username to use for AAD Federated AuthN."""
return self._internal_dict.get(SupportedKeywords.USER_ID)
@property
def application_client_id(self) -> Optional[str]:
"""The application client id to use for authentication when federated
authentication is used.
"""
return self._internal_dict.get(SupportedKeywords.APPLICATION_CLIENT_ID)
@property
def application_key(self) -> Optional[str]:
"""The application key to use for authentication when federated authentication is used"""
return self._internal_dict.get(SupportedKeywords.APPLICATION_KEY)
@property
def application_certificate(self) -> Optional[str]:
"""A PEM encoded certificate private key."""
return self._internal_dict.get(SupportedKeywords.APPLICATION_CERTIFICATE_BLOB)
@application_certificate.setter
def application_certificate(self, value: str):
self[SupportedKeywords.APPLICATION_CERTIFICATE_BLOB] = value
@property
def application_certificate_thumbprint(self) -> Optional[str]:
"""hex encoded thumbprint of the certificate."""
return self._internal_dict.get(SupportedKeywords.APPLICATION_CERTIFICATE_THUMBPRINT)
@application_certificate_thumbprint.setter
def application_certificate_thumbprint(self, value: str):
self[SupportedKeywords.APPLICATION_CERTIFICATE_THUMBPRINT] = value
@property
def authority_id(self) -> Optional[str]:
"""The ID of the AAD tenant where the application is configured.
(should be supplied only for non-Microsoft tenant)"""
return self._internal_dict.get(SupportedKeywords.AUTHORITY_ID)
@authority_id.setter
def authority_id(self, value: str):
self[SupportedKeywords.AUTHORITY_ID] = value
@property
def aad_federated_security(self) -> Optional[bool]:
"""A Boolean value that instructs the client to perform AAD federated authentication."""
return self._internal_dict.get(SupportedKeywords.FEDERATED_SECURITY)
@property
def user_token(self) -> Optional[str]:
"""User token."""
return self._internal_dict.get(SupportedKeywords.USER_TOKEN)
@property
def application_token(self) -> Optional[str]:
"""Application token."""
return self._internal_dict.get(SupportedKeywords.APPLICATION_TOKEN)
@property
def client_details(self) -> ClientDetails:
return ClientDetails(self.application_for_tracing, self.user_name_for_tracing)
@property
def login_hint(self) -> Optional[str]:
return self._internal_dict.get(SupportedKeywords.USER_ID)
@property
def domain_hint(self) -> Optional[str]:
return self._internal_dict.get(SupportedKeywords.AUTHORITY_ID)
@property
def password(self) -> Optional[str]:
return self._internal_dict.get(SupportedKeywords.PASSWORD)
def _set_connector_details(
self,
name: str,
version: str,
app_name: Optional[str] = None,
app_version: Optional[str] = None,
send_user: bool = False,
override_user: Optional[str] = None,
additional_fields: Optional[List[Tuple[str, str]]] = None,
):
"""
Sets the connector details for tracing purposes.
:param name: The name of the connector
:param version: The version of the connector
:param send_user: Whether to send the user name
:param override_user: Override the user name ( if send_user is True )
:param app_name: The name of the containing application
:param app_version: The version of the containing application
:param additional_fields: Additional fields to add to the header
"""
client_details = ClientDetails.set_connector_details(name, version, app_name, app_version, send_user, override_user, additional_fields)
self.application_for_tracing = client_details.application_for_tracing
self.user_name_for_tracing = client_details.user_name_for_tracing
def __str__(self) -> str:
dict_copy = self._internal_dict.copy()
for key in dict_copy:
if Keyword.lookup(key).secret:
dict_copy[key] = "****"
return self._build_connection_string(dict_copy)
def __repr__(self) -> str:
return self._build_connection_string(self._internal_dict)
def _build_connection_string(self, kcsb_as_dict: dict) -> str:
return ";".join(["{0}={1}".format(word.value, kcsb_as_dict[word]) for word in SupportedKeywords if word in kcsb_as_dict])
def _parse_data_source(self, url: str):
url = urlparse(url)
if not url.netloc:
return
segments = url.path.lstrip("/").split("/")
if len(segments) == 1 and segments[0] and not self.initial_catalog:
self.initial_catalog = segments[0]
self._internal_dict[SupportedKeywords.DATA_SOURCE] = url._replace(path="").geturl()