in azure-kusto-data/azure/kusto/data/kcsb.py [0:0]
def __init__(self, connection_string: str):
"""
Creates new KustoConnectionStringBuilder.
:param str connection_string: Kusto connection string should be of the format:
https://<clusterName>.kusto.windows.net;AAD User ID="user@microsoft.com";Password=P@ssWord
For more information please look at:
https://kusto.azurewebsites.net/docs/concepts/kusto_connection_strings.html
"""
assert_string_is_not_empty(connection_string)
self._internal_dict = {}
if connection_string is not None and "=" not in connection_string.partition(";")[0]:
connection_string = "Data Source=" + connection_string
self[SupportedKeywords.AUTHORITY_ID] = "organizations"
for kvp_string in connection_string.split(";"):
key, _, value = kvp_string.partition("=")
keyword = Keyword.parse(key)
value_stripped = value.strip()
if keyword.is_str_type():
if keyword.name == SupportedKeywords.DATA_SOURCE:
self[keyword.name] = value_stripped.rstrip("/")
self._parse_data_source(self.data_source)
elif keyword.name == SupportedKeywords.TRACE_USER_NAME:
self.user_name_for_tracing = value_stripped
elif keyword.name == SupportedKeywords.TRACE_APP_NAME:
self.application_for_tracing = value_stripped
else:
self[keyword.name] = value_stripped
elif keyword.is_bool_type():
if value_stripped in ["True", "true"]:
self[keyword.name] = True
elif value_stripped in ["False", "false"]:
self[keyword.name] = False
else:
raise KeyError("Expected aad federated security to be bool. Recieved %s" % value)
if self.initial_catalog is None:
self.initial_catalog = self.DEFAULT_DATABASE_NAME