connectors/sqldbs.py (57 lines of code) (raw):

import asyncio import logging import pyodbc import struct from azure.identity import ManagedIdentityCredential, AzureCliCredential, ChainedTokenCredential from connectors.keyvault import get_secret, generate_valid_secret_name from connectors.types import SQLDatabaseConfig class SQLDBClient: """ Client for connecting to a Fabric SQL Database using either SQL Server authentication (with a UID and password stored in Key Vault) or Managed Identity. """ def __init__(self, datasource_config): if not isinstance(datasource_config, SQLDatabaseConfig): datasource_config = SQLDatabaseConfig.model_validate(datasource_config) self.datasource_config = datasource_config async def create_connection(self): return await self._create_sqldatabase_connection() async def _create_sqldatabase_connection(self): server = self.datasource_config.server database = self.datasource_config.database uid = self.datasource_config.uid connection_string = ( f"Driver={{ODBC Driver 18 for SQL Server}};" f"Server={server},1433;" f"Database={database};" "Encrypt=yes;" "TrustServerCertificate=no;" "Connection Timeout=30;" ) if uid: kv_secret_name = generate_valid_secret_name(f"{self.datasource_config.id}-secret") # Retrieve SQL user password from Key Vault using datasource id. pwd = await get_secret(kv_secret_name) connection_string += f"UID={uid};PWD={pwd};" logging.info("Using SQL Server authentication for SQL Database.") try: connection = await asyncio.to_thread(pyodbc.connect, connection_string) return connection except Exception as e: logging.error(f"Failed to connect to SQL Database with SQL Server authentication: {e}") raise else: # Use Azure AD token for authentication via Managed Identity. credential = ChainedTokenCredential( ManagedIdentityCredential(), AzureCliCredential() ) try: token = credential.get_token("https://database.windows.net/.default") token_bytes = token.token.encode("UTF-16-LE") token_struct = struct.pack(f'<I{len(token_bytes)}s', len(token_bytes), token_bytes) SQL_COPT_SS_ACCESS_TOKEN = 1256 connection = await asyncio.to_thread( pyodbc.connect, connection_string, attrs_before={SQL_COPT_SS_ACCESS_TOKEN: token_struct} ) logging.info("Using Azure AD token authentication for SQL Database.") return connection except Exception as e: logging.error(f"Failed to connect to SQL Database with Azure AD token authentication: {e}") raise