in msticpy/data/drivers/kusto_driver.py [0:0]
def _get_connection_string(self, query_source: QuerySource = None, **kwargs):
"""Create a connection string from arguments and configuration."""
# If the connection string is supplied as a parameter, use that
cluster = database = None
new_connection = kwargs.get("connection_str")
if not new_connection:
# try to get cluster and db from kwargs or query_source metadata
cluster = self._lookup_cluster(kwargs.get("cluster", ""))
database = kwargs.get("database")
if cluster and database:
new_connection = self._create_connection(
cluster=cluster, database=database
)
self._cluster_uri = cluster
if not new_connection and query_source:
# try to get cluster and db from query_source metadata
cluster = cluster or query_source.metadata.get("cluster")
data_families = query_source.metadata.get("data_families")
if (
not isinstance(data_families, list) or len(data_families) == 0
) and not self.current_connection:
# call create connection so that we throw an informative error
self._create_connection(cluster=cluster, database=database)
if "." in data_families[0]: # type: ignore
_, qry_db = data_families[0].split(".", maxsplit=1) # type: ignore
else:
# Not expected but we can still use a DB value with no dot
qry_db = data_families[0] # type: ignore
database = database or qry_db
new_connection = self._create_connection(cluster=cluster, database=database)
self._cluster_uri = cluster
return new_connection