tools/database/measures.py [15:59]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
async def _perform_search(body: Dict[str, Any], search_index: str) -> Dict[str, Any]:
    """
    Executes a search query against the specified Azure AI Search index.

    Args:
        body (dict): The JSON body for the search request.
        search_index (str): The name of the search index to query.

    Returns:
        dict: The JSON response from the search service.
    """
    search_service = os.getenv("AZURE_SEARCH_SERVICE")
    if not search_service:
        raise Exception("AZURE_SEARCH_SERVICE environment variable is not set.")
    search_api_version = os.getenv("AZURE_SEARCH_API_VERSION", "2024-07-01")

    # Build the search endpoint URL.
    search_endpoint = (
        f"https://{search_service}.search.windows.net/indexes/{search_index}/docs/search"
        f"?api-version={search_api_version}"
    )

    # Obtain an access token for the search service.
    try:
        credential = ChainedTokenCredential(
            ManagedIdentityCredential(),
            AzureCliCredential()
        )
        azure_search_scope = "https://search.azure.com/.default"
        token = credential.get_token(azure_search_scope).token
    except Exception as e:
        logging.error("Error obtaining Azure Search token.", exc_info=True)
        raise Exception("Failed to obtain Azure Search token.") from e

    headers = {
        "Content-Type": "application/json",
        "Authorization": f"Bearer {token}"
    }

    async with aiohttp.ClientSession() as session:
        try:
            async with session.post(search_endpoint, headers=headers, json=body) as response:
                if response.status >= 400:
                    text = await response.text()
                    error_message = f"Status code: {response.status}. Error: {text}"
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



tools/database/tables.py [22:70]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
async def _perform_search(body: Dict[str, Any], search_index: str) -> Dict[str, Any]:
    """
    Executes a search query against the specified Azure AI Search index.

    Args:
        body (dict): The JSON body for the search request.
        search_index (str): The name of the search index to query.

    Returns:
        dict: The JSON response from the search service.

    Raises:
        Exception: If the search query fails or an error occurs obtaining the token.
    """
    search_service = os.getenv("AZURE_SEARCH_SERVICE")
    if not search_service:
        raise Exception("AZURE_SEARCH_SERVICE environment variable is not set.")
    search_api_version = os.getenv("AZURE_SEARCH_API_VERSION", "2024-07-01")

    # Build the search endpoint URL.
    search_endpoint = (
        f"https://{search_service}.search.windows.net/indexes/{search_index}/docs/search"
        f"?api-version={search_api_version}"
    )

    # Obtain an access token for the search service.
    try:
        credential = ChainedTokenCredential(
            ManagedIdentityCredential(),
            AzureCliCredential()
        )
        azure_search_scope = "https://search.azure.com/.default"
        token = credential.get_token(azure_search_scope).token
    except Exception as e:
        logging.error("Error obtaining Azure Search token.", exc_info=True)
        raise Exception("Failed to obtain Azure Search token.") from e

    headers = {
        "Content-Type": "application/json",
        "Authorization": f"Bearer {token}"
    }

    # Perform the asynchronous HTTP POST request.
    async with aiohttp.ClientSession() as session:
        try:
            async with session.post(search_endpoint, headers=headers, json=body) as response:
                if response.status >= 400:
                    text = await response.text()
                    error_message = f"Status code: {response.status}. Error: {text}"
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



