util/generate_user_token.py (72 lines of code) (raw):

import os import msal import logging import urllib.parse import threading import time import asyncio from wsgiref.simple_server import make_server from dotenv import load_dotenv from connectors.cosmosdb import CosmosDBClient from connectors.keyvault import get_secret, generate_valid_secret_name logging.basicConfig(level=logging.INFO) GREEN_BOLD = "\033[1;32m" YELLOW_BOLD = "\033[1;33m" RESET_COLOR = "\033[0m" def start_local_server(port, result_container): def app(environ, start_response): query = environ.get("QUERY_STRING", "") params = urllib.parse.parse_qs(query) if "code" in params and "state" in params: result_container["query_params"] = {k: v[0] for k, v in params.items()} response_body = b"Authentication complete. You can close this window." else: response_body = b"Authorization code or state not found." start_response("200 OK", [("Content-Type", "text/plain")]) return [response_body] server = make_server("localhost", port, app) threading.Thread(target=server.serve_forever, daemon=True).start() return server def get_user_token(client_id, client_secret, tenant_id): authority = f"https://login.microsoftonline.com/{tenant_id}" scopes = ["https://analysis.windows.net/powerbi/api/Dataset.Read.All"] redirect_uri = "http://localhost:8000" app = msal.ConfidentialClientApplication(client_id, authority=authority, client_credential=client_secret) auth_flow = app.initiate_auth_code_flow(scopes, redirect_uri=redirect_uri) print(f"{GREEN_BOLD}Please navigate to the following URL to sign in:{RESET_COLOR}") print(auth_flow["auth_uri"]) result_container = {} server = start_local_server(8000, result_container) timeout = 120 start_time = time.time() while "query_params" not in result_container and (time.time() - start_time) < timeout: time.sleep(1) server.shutdown() if "query_params" not in result_container: raise Exception("Timeout waiting for the authorization code.") result = app.acquire_token_by_auth_code_flow(auth_flow, result_container["query_params"]) if "access_token" in result: return result["access_token"] else: raise Exception("Failed to obtain access token: " + str(result.get("error_description"))) async def main(): load_dotenv() # Get datasource config datasource = os.getenv("FABRIC_DATASOURCE", None) cosmosdb = CosmosDBClient() datasources_container = os.environ.get('DATASOURCES_CONTAINER', 'datasources') datasource_config = await cosmosdb.get_document(datasources_container, datasource) if not datasource_config: print(f"Could not read datasource configuration for {datasource} from CosmosDB.") exit(1) client_id = datasource_config.get("client_id") tenant_id = datasource_config.get("tenant_id") kv_secret_name = generate_valid_secret_name(f"{datasource_config.get('id')}-secret") # Get secret using the async function client_secret = await get_secret(kv_secret_name) try: token = get_user_token(client_id, client_secret, tenant_id) print(f"{GREEN_BOLD}Access token obtained:{RESET_COLOR}") print(f"{YELLOW_BOLD}{token}{RESET_COLOR}") except Exception as e: logging.error(f"Error obtaining access token: {e}") if __name__ == "__main__": asyncio.run(main())