orc/plugins/Retrieval/native_function.py (218 lines of code) (raw):

from shared.util import get_secret, get_aoai_config, extract_text_from_html, get_possitive_int_or_default # from semantic_kernel.skill_definition import sk_function from openai import AzureOpenAI from semantic_kernel.functions import kernel_function from tenacity import retry, wait_random_exponential, stop_after_attempt import logging import os import time import sys from typing import Dict if sys.version_info >= (3, 9): from typing import Annotated else: from typing_extensions import Annotated from azure.cognitiveservices.search.customsearch import CustomSearchClient from msrest.authentication import CognitiveServicesCredentials from azure.identity.aio import ManagedIdentityCredential, AzureCliCredential, ChainedTokenCredential import aiohttp import asyncio import os import logging # Azure OpenAI Integration Settings AZURE_OPENAI_EMBEDDING_MODEL = os.environ.get("AZURE_OPENAI_EMBEDDING_MODEL") AZURE_OPENAI_CHATGPT_MODEL = os.environ.get("AZURE_OPENAI_CHATGPT_MODEL") AZURE_OPENAI_CHATGPT_DEPLOYMENT = os.environ.get("AZURE_OPENAI_CHATGPT_DEPLOYMENT") AZURE_OPENAI_RESOURCE = os.environ.get("AZURE_OPENAI_RESOURCE") AZURE_OPENAI_TEMPERATURE = os.getenv("AZURE_OPENAI_TEMPERATURE", "0.17") AZURE_OPENAI_APIVERSION = os.environ.get("AZURE_OPENAI_APIVERSION") AZURE_OPENAI_EMBEDDING_DEPLOYMENT = os.environ.get("AZURE_OPENAI_EMBEDDING_DEPLOYMENT") AZURE_OPENAI_EMBEDDING_APIVERSION = os.environ.get("AZURE_OPENAI_EMBEDDING_APIVERSION") # Azure Search Integration Settings AZURE_SEARCH_SERVICE = os.environ.get("AZURE_SEARCH_SERVICE") AZURE_SEARCH_INDEX = os.environ.get("AZURE_SEARCH_INDEX") AZURE_SEARCH_API_VERSION = os.environ.get("AZURE_SEARCH_API_VERSION", "2024-07-01") if AZURE_SEARCH_API_VERSION < '2023-10-01-Preview': # query is using vectorQueries that requires at least 2023-10-01-Preview'. AZURE_SEARCH_API_VERSION = '2023-11-01' AZURE_SEARCH_TOP_K = os.environ.get("AZURE_SEARCH_TOP_K") or "3" AZURE_SEARCH_USE_SEMANTIC = os.environ.get("AZURE_SEARCH_USE_SEMANTIC") or "false" AZURE_SEARCH_APPROACH = os.environ.get("AZURE_SEARCH_APPROACH") or "hybrid" AZURE_SEARCH_OYD_USE_SEMANTIC_SEARCH = os.environ.get("AZURE_SEARCH_OYD_USE_SEMANTIC_SEARCH") or "false" AZURE_SEARCH_OYD_USE_SEMANTIC_SEARCH = True if AZURE_SEARCH_OYD_USE_SEMANTIC_SEARCH == "true" else False AZURE_SEARCH_SEMANTIC_SEARCH_CONFIG = os.environ.get("AZURE_SEARCH_SEMANTIC_SEARCH_CONFIG") or "my-semantic-config" AZURE_SEARCH_ENABLE_IN_DOMAIN = os.environ.get("AZURE_SEARCH_ENABLE_IN_DOMAIN") or "true" AZURE_SEARCH_ENABLE_IN_DOMAIN = True if AZURE_SEARCH_ENABLE_IN_DOMAIN == "true" else False AZURE_SEARCH_CONTENT_COLUMNS = os.environ.get("AZURE_SEARCH_CONTENT_COLUMNS") or "content" AZURE_SEARCH_FILENAME_COLUMN = os.environ.get("AZURE_SEARCH_FILENAME_COLUMN") or "filepath" AZURE_SEARCH_TITLE_COLUMN = os.environ.get("AZURE_SEARCH_TITLE_COLUMN") or "title" AZURE_SEARCH_URL_COLUMN = os.environ.get("AZURE_SEARCH_URL_COLUMN") or "url" # Bing Search Integration Settings BING_SEARCH_TOP_K = os.environ.get("BING_SEARCH_TOP_K") or "3" BING_CUSTOM_SEARCH_URL = "https://api.bing.microsoft.com/v7.0/custom/search?" BING_SEARCH_MAX_TOKENS = os.environ.get("BING_SEARCH_MAX_TOKENS") or "1000" VECTOR_SEARCH_APPROACH="vector" TERM_SEARCH_APPROACH="term" HYBRID_SEARCH_APPROACH="hybrid" # General Settings TOP_K_DEFAULT = 3 MAX_TOKENS_DEFAULT = 1000 # Logging Settings LOGLEVEL = os.environ.get('LOGLEVEL', 'DEBUG').upper() logging.basicConfig(level=LOGLEVEL) # APIM Settings APIM_ENABLED = os.environ.get('APIM_ENABLED', 'false').lower() == 'true' APIM_BING_CUSTOM_SEARCH_URL = os.environ.get('APIM_BING_CUSTOM_SEARCH_URL', "") + "/search?" APIM_AZURE_SEARCH_URL = os.environ.get('APIM_AZURE_SEARCH_URL', "") @retry(wait=wait_random_exponential(min=2, max=60), stop=stop_after_attempt(6), reraise=True) # Function to generate embeddings for title and content fields, also used for query embeddings async def generate_embeddings(text,apim_key=None): embeddings_config = await get_aoai_config(AZURE_OPENAI_EMBEDDING_MODEL) if APIM_ENABLED: client = AzureOpenAI( api_version=embeddings_config['api_version'], azure_endpoint=embeddings_config['endpoint'], api_key=apim_key ) else: client = AzureOpenAI( api_version=embeddings_config['api_version'], azure_endpoint=embeddings_config['endpoint'], azure_ad_token=embeddings_config['api_key'], ) embeddings = client.embeddings.create(input=[text], model=embeddings_config['deployment']).data[0].embedding return embeddings class Retrieval: @kernel_function( description="Search a knowledge base for sources to ground and give context to answer a user question. Return sources.", name="VectorIndexRetrieval", ) async def VectorIndexRetrieval( self, input: Annotated[str, "The user question"], apim_key: Annotated[str, "The key to access the apim endpoint"], # client_principal_id: Annotated[str, "The user client principal id"] security_ids: Annotated[str, "Comma separated list string with user security ids"] ) -> Annotated[str, "the output is a string with the search results"]: search_results = [] search_query = input # search_filter = f"security_id/any(g:search.in(g,'{client_principal_id}'))" search_filter = ( f"metadata_security_id/any(g:search.in(g, '{security_ids}')) " f"or not metadata_security_id/any()" ) try: async with ChainedTokenCredential( ManagedIdentityCredential(), AzureCliCredential() ) as credential: start_time = time.time() logging.info(f"[sk_retrieval] generating question embeddings. search query: {search_query}") embeddings_query = await generate_embeddings(search_query,apim_key=apim_key) response_time = round(time.time() - start_time, 2) logging.info(f"[sk_retrieval] finished generating question embeddings. {response_time} seconds") azureSearchKey =await credential.get_token("https://search.azure.com/.default") azureSearchKey = azureSearchKey.token logging.info(f"[sk_retrieval] querying azure ai search. search query: {search_query}") # prepare body body = { "select": "title, content, url, filepath, chunk_id", "top": AZURE_SEARCH_TOP_K } if AZURE_SEARCH_APPROACH == TERM_SEARCH_APPROACH: body["search"] = search_query elif AZURE_SEARCH_APPROACH == VECTOR_SEARCH_APPROACH: body["vectorQueries"] = [{ "kind": "vector", "vector": embeddings_query, "fields": "contentVector", "k": int(AZURE_SEARCH_TOP_K) }] elif AZURE_SEARCH_APPROACH == HYBRID_SEARCH_APPROACH: body["search"] = search_query body["vectorQueries"] = [{ "kind": "vector", "vector": embeddings_query, "fields": "contentVector", "k": int(AZURE_SEARCH_TOP_K) }] if AZURE_SEARCH_USE_SEMANTIC == "true" and AZURE_SEARCH_APPROACH != VECTOR_SEARCH_APPROACH: body["queryType"] = "semantic" body["semanticConfiguration"] = AZURE_SEARCH_SEMANTIC_SEARCH_CONFIG body["filter"] = search_filter logging.debug(f"[ai_search] search filter: {search_filter}") headers = { 'Content-Type': 'application/json', 'Authorization': f'Bearer {azureSearchKey}' } if APIM_ENABLED: headers = { 'Content-Type': 'application/json', 'api-key': apim_key, '$top': AZURE_SEARCH_TOP_K } search_endpoint = f"{APIM_AZURE_SEARCH_URL}/docs?api-version={AZURE_SEARCH_API_VERSION}" else: headers = { 'Content-Type': 'application/json', 'Authorization': f'Bearer {azureSearchKey}' } search_endpoint = f"https://{AZURE_SEARCH_SERVICE}.search.windows.net/indexes/{AZURE_SEARCH_INDEX}/docs/search?api-version={AZURE_SEARCH_API_VERSION}" start_time = time.time() async with aiohttp.ClientSession() as session: if APIM_ENABLED: async with session.get(search_endpoint, headers=headers, json=body) as response: status_code = response.status text=await response.text() json=await response.json() if status_code >= 400: error_on_search = True error_message = f'Status code: {status_code}.' if text != "": error_message += f" Error: {response.text}." logging.error(f"[sk_retrieval] error {status_code} when searching documents. {error_message}") else: if json['value']: logging.info(f"[sk_retrieval] {len(json['value'])} documents retrieved") for doc in json['value']: search_results.append(doc['filepath'] + ": " + doc['content'].strip() + "\n") else: logging.info(f"[sk_retrieval] No documents retrieved") else: async with session.post(search_endpoint, headers=headers, json=body) as response: status_code = response.status text=await response.text() json=await response.json() if status_code >= 400: error_on_search = True error_message = f'Status code: {status_code}.' if text != "": error_message += f" Error: {response.text}." logging.error(f"[sk_retrieval] error {status_code} when searching documents. {error_message}") else: if json['value']: logging.info(f"[sk_retrieval] {len(json['value'])} documents retrieved") for doc in json['value']: search_results.append(doc['filepath'] + ": " + doc['content'].strip() + "\n") else: logging.info(f"[sk_retrieval] No documents retrieved") response_time = round(time.time() - start_time, 2) logging.info(f"[sk_retrieval] finished querying azure ai search. {response_time} seconds") except Exception as e: error_message = str(e) logging.error(f"[sk_retrieval] error when getting the answer {error_message}") sources = ' '.join(search_results) return sources @kernel_function( description="Search bing for sources to ground and give context to answer a user question. Return sources.", name="BingRetrieval", ) async def BingRetrieval( self, input: Annotated[str, "The user question"], bing_api_key: Annotated[str, "The key to access the bing search"], bing_custom_config_id: Annotated[str, "The custom config id to access the bing search"] ) -> Annotated[str, "the output is a string with the search results"]: bing_custom_config_id = await get_secret('bingCustomConfigId') if(APIM_ENABLED): endpoint=APIM_BING_CUSTOM_SEARCH_URL else: endpoint=BING_CUSTOM_SEARCH_URL client = CustomSearchClient(endpoint=endpoint, credentials=CognitiveServicesCredentials(bing_api_key)) start_time = time.time() web_data = client.custom_instance.search(query=input, custom_config=bing_custom_config_id, count=BING_SEARCH_TOP_K) bing_sources = "" async with aiohttp.ClientSession() as session: if web_data.web_pages and hasattr(web_data.web_pages, 'value'): tasks= [extract_text_from_html(web,session) for web in web_data.web_pages.value] results = await asyncio.gather(*tasks) for result in results: bing_sources += result[:get_possitive_int_or_default(BING_SEARCH_MAX_TOKENS, 1000)] logging.info(f"[sk_retrieval] finished querying bing search. {time.time()-start_time} seconds") return bing_sources