def _get_connection_string()

in msticpy/data/drivers/kusto_driver.py [0:0]


    def _get_connection_string(self, query_source: QuerySource = None, **kwargs):
        """Create a connection string from arguments and configuration."""
        # If the connection string is supplied as a parameter, use that
        cluster = database = None
        new_connection = kwargs.get("connection_str")
        if not new_connection:
            # try to get cluster and db from kwargs or query_source metadata
            cluster = self._lookup_cluster(kwargs.get("cluster", ""))
            database = kwargs.get("database")
            if cluster and database:
                new_connection = self._create_connection(
                    cluster=cluster, database=database
                )
                self._cluster_uri = cluster
        if not new_connection and query_source:
            # try to get cluster and db from query_source metadata
            cluster = cluster or query_source.metadata.get("cluster")
            data_families = query_source.metadata.get("data_families")
            if (
                not isinstance(data_families, list) or len(data_families) == 0
            ) and not self.current_connection:
                # call create connection so that we throw an informative error
                self._create_connection(cluster=cluster, database=database)
            if "." in data_families[0]:  # type: ignore
                _, qry_db = data_families[0].split(".", maxsplit=1)  # type: ignore
            else:
                # Not expected but we can still use a DB value with no dot
                qry_db = data_families[0]  # type: ignore
            database = database or qry_db
            new_connection = self._create_connection(cluster=cluster, database=database)
            self._cluster_uri = cluster
        return new_connection