connectors/fabric.py (116 lines of code) (raw):
import logging
import pyodbc
import asyncio
import aiohttp
from tenacity import retry, stop_after_attempt, wait_exponential, retry_if_exception_type
from azure.identity.aio import ClientSecretCredential
from connectors.keyvault import get_secret, generate_valid_secret_name
from connectors.types import SQLEndpointConfig, SemanticModelConfig
class SQLEndpointClient:
"""
Client for connecting to Fabric SQL Endpoint using a service principal.
"""
def __init__(self, datasource_config):
# Validate and parse the configuration using Pydantic
if not isinstance(datasource_config, SQLEndpointConfig):
datasource_config = SQLEndpointConfig.model_validate(datasource_config)
self.datasource_config = datasource_config
async def create_connection(self):
"""Create and return a connection using pyodbc."""
return await self._create_sqlendpoint_connection()
async def _create_sqlendpoint_connection(self):
server = self.datasource_config.server
database = self.datasource_config.database
client_id = self.datasource_config.client_id
tenant_id = self.datasource_config.tenant_id
# Format service principal ID as client_id@tenant_id
service_principal_id = f"{client_id}@{tenant_id}"
# Retrieve the client secret from Key Vault using the datasource id
kv_secret_name = generate_valid_secret_name(f"{self.datasource_config.id}-secret")
# pyodbc is synchronous so we run the async secret call in an event loop
client_secret = await get_secret(kv_secret_name)
connection_string = (
f"DRIVER={{ODBC Driver 18 for SQL Server}};"
f"SERVER={server};"
f"DATABASE={database};"
f"UID={service_principal_id};"
f"PWD={client_secret};"
f"Authentication=ActiveDirectoryServicePrincipal"
)
try:
connection = await asyncio.to_thread(pyodbc.connect, connection_string)
return connection
except Exception as e:
logging.error(f"[fabric] Failed to connect to the SQL endpoint with service principal: {e}")
raise
class SemanticModelClient:
"""
Client for executing DAX queries against a Fabric semantic model (Power BI)
using a service principal.
"""
def __init__(self, datasource_config):
if not isinstance(datasource_config, SemanticModelConfig):
datasource_config = SemanticModelConfig.model_validate(datasource_config)
self.datasource_config = datasource_config
async def _get_restapi_access_token(self) -> str:
"""
Obtain an access token using ClientSecretCredential.
"""
kv_secret_name = generate_valid_secret_name(f"{self.datasource_config.id}-secret")
client_secret = await get_secret(kv_secret_name)
credential = ClientSecretCredential(
tenant_id=self.datasource_config.tenant_id,
client_id=self.datasource_config.client_id,
client_secret=client_secret
)
try:
token = await credential.get_token("https://analysis.windows.net/powerbi/api/.default")
logging.info("[fabric] Access token acquired successfully for Semantic Model.")
return token.token
except Exception as e:
logging.error(f"[fabric] Failed to obtain access token for Semantic Model: {e}")
raise
finally:
await credential.close()
@retry(
stop=stop_after_attempt(5),
wait=wait_exponential(multiplier=1, min=1, max=10),
retry=retry_if_exception_type(aiohttp.ClientError)
)
async def execute_restapi_dax_query(self, dax_query: str, user_token: str = None, impersonated_user: str = None) -> list:
"""
Execute a DAX query against the semantic model endpoint.
Returns a list of dictionaries representing the query rows.
"""
access_token = user_token
headers = {
"Content-Type": "application/json",
"Authorization": f"Bearer {access_token}"
}
# Construct the URL using the dataset from the configuration.
# url = f"https://api.powerbi.com/v1.0/myorg/groups/{self.datasource_config.workspace}/datasets/f72fb57e-80bf-4155-9c20-53ee4539e8b9/executeQueries"
url = f"https://api.powerbi.com/v1.0/myorg/datasets/{self.datasource_config.dataset}/executeQueries"
logging.info(f"[fabric] Rest API endpoint: {url}.")
body = {
"queries": [{"query": dax_query}],
"serializerSettings": {"includeNulls": True}
}
if impersonated_user:
body["impersonatedUserName"] = impersonated_user
async with aiohttp.ClientSession() as session:
async with session.post(url, headers=headers, json=body) as response:
if response.status == 200:
response_json = await response.json()
results = response_json.get('results', [])
if not results:
logging.warning("[fabric] No results found in the DAX query response.")
return []
tables = results[0].get('tables', [])
if not tables:
logging.warning("[fabric] No tables found in the DAX query response.")
return []
table = tables[0]
rows = table.get('rows', [])
columns = table.get('columns', [])
col_names = [col.get("name", f"Column{i}") for i, col in enumerate(columns)]
# If column names are available, zip them with each row.
if rows and col_names and len(col_names) == len(rows[0]):
results_list = [dict(zip(col_names, row)) for row in rows]
else:
results_list = rows
logging.info("[fabric] DAX query executed successfully on Semantic Model.")
return results_list
elif response.status == 429:
retry_after = response.headers.get("Retry-After")
wait_time = int(retry_after) if retry_after else 0
logging.warning(f"[fabric] Rate limited. Retrying after {wait_time} seconds.")
await asyncio.sleep(wait_time)
raise aiohttp.ClientError("Rate limited")
elif 400 <= response.status < 500:
error_message = await response.text()
logging.error(f"[fabric] Client error executing DAX query. Status: {response.status}, Message: {error_message}")
raise aiohttp.ClientError(f"Client error: {response.status} - {error_message}")
else:
error_message = await response.text()
logging.error(f"[fabric] Server error executing DAX query. Status: {response.status}, Message: {error_message}")
raise aiohttp.ClientError(f"Server error: {response.status}")
async def create_connection(self):
"""
For interface consistency. Semantic Model operations do not maintain a persistent connection.
"""
return self