shared/util.py (334 lines of code) (raw):

# utility functions import re import json import logging import os import tiktoken import time import urllib.parse from azure.cosmos.aio import CosmosClient as AsyncCosmosClient from azure.keyvault.secrets.aio import SecretClient as AsyncSecretClient from azure.identity.aio import ManagedIdentityCredential, AzureCliCredential, ChainedTokenCredential from tenacity import retry, wait_random_exponential, stop_after_attempt import semantic_kernel as sk from semantic_kernel.connectors.ai.open_ai import AzureChatCompletion from bs4 import BeautifulSoup import aiohttp # logging level logging.getLogger('azure').setLevel(logging.WARNING) LOGLEVEL = os.environ.get('LOGLEVEL', 'DEBUG').upper() logging.basicConfig(level=LOGLEVEL) # Env variables AZURE_OPENAI_TEMPERATURE = os.environ.get("AZURE_OPENAI_TEMPERATURE") or "0.17" AZURE_OPENAI_TOP_P = os.environ.get("AZURE_OPENAI_TOP_P") or "0.27" AZURE_OPENAI_RESP_MAX_TOKENS = os.environ.get("AZURE_OPENAI_MAX_TOKENS") or "1000" AZURE_OPENAI_LOAD_BALANCING = os.environ.get("AZURE_OPENAI_LOAD_BALANCING") or "false" AZURE_OPENAI_LOAD_BALANCING = True if AZURE_OPENAI_LOAD_BALANCING.lower() == "true" else False AZURE_OPENAI_CHATGPT_MODEL = os.environ.get("AZURE_OPENAI_CHATGPT_MODEL") AZURE_OPENAI_EMBEDDING_MODEL = os.environ.get("AZURE_OPENAI_EMBEDDING_MODEL") ORCHESTRATOR_MESSAGES_LANGUAGE = os.environ.get("ORCHESTRATOR_MESSAGES_LANGUAGE") or "en" AZURE_DB_ID = os.environ.get("AZURE_DB_ID") AZURE_DB_NAME = os.environ.get("AZURE_DB_NAME") AZURE_DB_URI = f"https://{AZURE_DB_ID}.documents.azure.com:443/" BING_RETRIEVAL = os.environ.get("BING_RETRIEVAL") or "true" BING_RETRIEVAL = True if BING_RETRIEVAL.lower() == "true" else False SEARCH_RETRIEVAL = os.environ.get("SEARCH_RETRIEVAL") or "true" SEARCH_RETRIEVAL = True if SEARCH_RETRIEVAL.lower() == "true" else False RETRIEVAL_PRIORITY = os.environ.get("RETRIEVAL_PRIORITY") or "search" SECURITY_HUB_CHECK = os.environ.get("SECURITY_HUB_CHECK") or "false" SECURITY_HUB_CHECK = True if SECURITY_HUB_CHECK.lower() == "true" else False APIM_ENABLED = os.environ.get("APIM_ENABLED") or "false" APIM_ENABLED = True if APIM_ENABLED.lower() == "true" else False model_max_tokens = { 'gpt-35-turbo': 4096, 'gpt-35-turbo-16k': 16384, 'gpt-4': 8192, 'gpt-4-32k': 32768, 'gpt-4o': 8192 } ########################################################## # KEY VAULT ########################################################## async def get_secret(secretName): keyVaultName = os.environ["AZURE_KEY_VAULT_NAME"] KVUri = f"https://{keyVaultName}.vault.azure.net" async with ChainedTokenCredential( ManagedIdentityCredential(), AzureCliCredential()) as credential: async with AsyncSecretClient(vault_url=KVUri, credential=credential) as client: retrieved_secret = await client.get_secret(secretName) value = retrieved_secret.value # Consider logging the elapsed_time or including it in the return value if needed return value ########################################################## # HISTORY FUNCTIONS ########################################################## def get_chat_history_as_text(history, include_last_turn=True, approx_max_tokens=1000): history_text = "" if len(history) == 0: return history_text for h in reversed(history if include_last_turn else history[:-1]): history_text = f"{h['role']}:" + h["content"] + "\n" + history_text if len(history_text) > approx_max_tokens*4: break return history_text def get_chat_history_as_messages(history, include_previous_questions=True, include_last_turn=True, approx_max_tokens=1000): history_list = [] if len(history) == 0: return history_list for h in reversed(history if include_last_turn else history[:-1]): history_item = {"role": h["role"], "content": h["content"]} if "function_call" in h: history_item.update({"function_call": h["function_call"]}) if "name" in h: history_item.update({"name": h["name"]}) history_list.insert(0, history_item) if len(history_list) > approx_max_tokens*4: break # remove previous questions if needed if not include_previous_questions: new_list = [] for idx, item in enumerate(history_list): # keep only assistant messages and the last message # obs: if include_last_turn is True, the last user message is also kept if item['role'] == 'assistant' or idx == len(history_list)-1: new_list.append(item) history_list = new_list return history_list ########################################################## # GPT FUNCTIONS ########################################################## def number_of_tokens(messages, model): prompt = json.dumps(messages) encoding = tiktoken.encoding_for_model(model.replace('gpt-35-turbo','gpt-3.5-turbo')) num_tokens = len(encoding.encode(prompt)) return num_tokens def truncate_to_max_tokens(text, extra_tokens, model): max_tokens = model_max_tokens[model] - extra_tokens tokens_allowed = max_tokens - number_of_tokens(text, model=model) while tokens_allowed < int(AZURE_OPENAI_RESP_MAX_TOKENS) and len(text) > 0: text = text[:-1] tokens_allowed = max_tokens - number_of_tokens(text, model=model) return text # reduce messages to fit in the model's max tokens def optmize_messages(chat_history_messages, model): messages = chat_history_messages # check each get_sources function message and reduce its size to fit into the model's max tokens for idx, message in enumerate(messages): if message['role'] == 'function' and message['name'] == 'get_sources': # top tokens to the max tokens allowed by the model sources = json.loads(message['content'])['sources'] tokens_allowed = model_max_tokens[model] - number_of_tokens(json.dumps(messages), model=model) while tokens_allowed < int(AZURE_OPENAI_RESP_MAX_TOKENS) and len(sources) > 0: sources = sources[:-1] content = json.dumps({"sources": sources}) messages[idx]['content'] = content tokens_allowed = model_max_tokens[model] - number_of_tokens(json.dumps(messages), model=model) return messages @retry(wait=wait_random_exponential(min=20, max=60), stop=stop_after_attempt(6), reraise=True) async def call_semantic_function(kernel, function, arguments): function_result = await kernel.invoke(function, arguments) return function_result @retry(wait=wait_random_exponential(min=2, max=60), stop=stop_after_attempt(6), reraise=True) async def chat_complete(messages, functions, params={}, function_call='auto',apim_key=None): """ Return assistant chat response based on user query. Assumes existing list of messages """ oai_config = await get_aoai_config(AZURE_OPENAI_CHATGPT_MODEL) messages = optmize_messages(messages, AZURE_OPENAI_CHATGPT_MODEL) url = f"{oai_config['endpoint']}/openai/deployments/{oai_config['deployment']}/chat/completions?api-version={oai_config['api_version']}" if(APIM_ENABLED): headers = { "Content-Type": "application/json", "api-key": apim_key } else: headers = { "Content-Type": "application/json", "Authorization": "Bearer "+ oai_config['api_key'] } data = { "messages": messages, "max_tokens": params.get("max_tokens", int(AZURE_OPENAI_RESP_MAX_TOKENS)) } if not function_call == 'none' and len(functions) > 0: data["functions"] = functions data["function_call"] = function_call if function_call == 'auto': data['temperature'] = 0 else: data['temperature'] = params.get("temperature", float(AZURE_OPENAI_TEMPERATURE)) data['top_p'] = params.get("top_p", float(AZURE_OPENAI_TOP_P)) start_time = time.time() async with aiohttp.ClientSession() as session: async with session.post(url, headers=headers, data=json.dumps(data)) as request: response=await request.json() response_time = round(time.time() - start_time,2) logging.info(f"[util__module] called chat completion api in {response_time:.6f} seconds") return response ########################################################## # FORMATTING FUNCTIONS ########################################################## # enforce answer format to the desired format (html, markdown, none) def format_answer(answer, format= 'none'): formatted_answer = answer if format == 'html': # Convert bold syntax (**text**) to HTMLFhtml formatted_answer = re.sub(r'\*\*(.*?)\*\*', r'<strong>\1</strong>', formatted_answer) # Convert italic syntax (*text*) to HTML formatted_answer = re.sub(r'\*(.*?)\*', r'<em>\1</em>', formatted_answer) # Return the converted text elif format == 'markdown': formatted_answer = answer # TODO elif format == 'none': formatted_answer = answer # TODO return formatted_answer # replace [doc1] [doc2] [doc3] with the corresponding filepath def replace_doc_ids_with_filepath(answer, citations): for i, citation in enumerate(citations): filepath = urllib.parse.quote(citation['filepath']) answer = answer.replace(f"[doc{i+1}]", f"[{filepath}]") return answer def escape_xml_characters(input_string): """ Escapes special characters in a string for XML. Args: input_string (str): The string to escape. Returns: str: The escaped string. """ # Mapping of special characters to their escaped versions escape_mappings = { "&": "&amp;", "\"": "&quot;", "'": "&apos;", "<": "&lt;", ">": "&gt;" } # Replace each special character with its escaped version for key, value in escape_mappings.items(): input_string = input_string.replace(key, value) return input_string ########################################################## # MESSAGES FUNCTIONS ########################################################## def get_message(message): if ORCHESTRATOR_MESSAGES_LANGUAGE.startswith("pt"): messages_file = "orc/messages/pt.json" elif ORCHESTRATOR_MESSAGES_LANGUAGE.startswith("es"): messages_file = "orc/messages/es.json" else: messages_file = "orc/messages/en.json" with open(messages_file, 'r') as f: json_data = f.read() messages_dict = json.loads(json_data) return messages_dict[message] def get_last_messages(messages, n): """ This function returns the last n*2 messages from the provided list, excluding the last message. Parameters: messages (list): A list of messages. n (int): The number of pairs of messages to return. Returns: list: A list containing the last n*2 messages, excluding the last message. If the input list is empty or contains only one message, an empty list is returned. Note: This function assumes that a conversation consists of pairs of messages (a message and a response). Therefore, it returns n*2 messages to get n pairs of messages. """ # Check if messages is not empty and has more than one element if messages and len(messages) > 1: # Get the last N*2 messages (N pairs), excluding the last message last_conversations = messages[-(n*2+1):-1] return last_conversations else: return [] ########################################################## # SEMANTIC KERNEL ########################################################## def load_sk_plugin(name, oai_config): kernel = sk.Kernel() kernel.add_chat_service("chat_completion", AzureChatCompletion(oai_config['deployment'], oai_config['endpoint'], oai_config['api_key'], ad_auth=True)) plugin = kernel.import_semantic_skill_from_directory("orc/plugins", name) native_functions = kernel.import_native_skill_from_directory("orc/plugins", name) plugin.update(native_functions) return plugin async def create_kernel(service_id='aoai_chat_completion',apim_key=None): kernel = sk.Kernel() chatgpt_config =await get_aoai_config(AZURE_OPENAI_CHATGPT_MODEL) if APIM_ENABLED: kernel.add_service( AzureChatCompletion( service_id=service_id, deployment_name=chatgpt_config['deployment'], endpoint=chatgpt_config['endpoint'], api_version=chatgpt_config['api_version'], api_key=apim_key ) ) else: kernel.add_service( AzureChatCompletion( service_id=service_id, deployment_name=chatgpt_config['deployment'], endpoint=chatgpt_config['endpoint'], api_version=chatgpt_config['api_version'], ad_token=chatgpt_config['api_key'] ) ) return kernel def get_usage_tokens(function_result, token_type='total'): metadata = function_result.metadata['metadata'] usage_tokens = 0 if token_type == 'completion': usage_tokens = sum(item['usage'].completion_tokens for item in metadata if 'usage' in item) elif token_type == 'prompt': usage_tokens = sum(item['usage'].prompt_tokens for item in metadata if 'usage' in item) elif token_type == 'total': usage_tokens = sum(item['usage'].total_tokens for item in metadata if 'usage' in item) return usage_tokens ########################################################## # AOAI FUNCTIONS ########################################################## def get_list_from_string(string): result = string.split(',') result = [item.strip() for item in result] return result async def get_aoai_config(model): if APIM_ENABLED: if model in ('gpt-35-turbo', 'gpt-35-turbo-16k', 'gpt-4', 'gpt-4-32k','gpt-4o'): deployment = os.environ.get("AZURE_OPENAI_CHATGPT_DEPLOYMENT") or "gpt-4o" elif model == AZURE_OPENAI_EMBEDDING_MODEL: deployment = os.environ.get("AZURE_OPENAI_EMBEDDING_DEPLOYMENT") else: raise Exception(f"Model {model} not supported. Check if you have the correct env variables set.") result = { "endpoint": os.environ.get("APIM_AZURE_OPENAI_ENDPOINT"), "deployment": deployment, "model": model, # ex: 'gpt-35-turbo-16k', 'gpt-4', 'gpt-4-32k', 'gpt-4o' "api_version": os.environ.get("AZURE_OPENAI_API_VERSION") or "2024-03-01-preview", } else: resource = await get_next_resource(model) async with ChainedTokenCredential( ManagedIdentityCredential(), AzureCliCredential()) as credential: token = await credential.get_token("https://cognitiveservices.azure.com/.default") if model in ('gpt-35-turbo', 'gpt-35-turbo-16k', 'gpt-4', 'gpt-4-32k','gpt-4o'): deployment = os.environ.get("AZURE_OPENAI_CHATGPT_DEPLOYMENT") or "gpt-4o" elif model == AZURE_OPENAI_EMBEDDING_MODEL: deployment = os.environ.get("AZURE_OPENAI_EMBEDDING_DEPLOYMENT") else: raise Exception(f"Model {model} not supported. Check if you have the correct env variables set.") result = { "resource": resource, "endpoint": f"https://{resource}.openai.azure.com", "deployment": deployment, "model": model, # ex: 'gpt-35-turbo-16k', 'gpt-4', 'gpt-4-32k', 'gpt-4o' "api_version": os.environ.get("AZURE_OPENAI_API_VERSION") or "2024-03-01-preview", "api_key": token.token } return result async def get_next_resource(model): # define resource resources = os.environ.get("AZURE_OPENAI_RESOURCE") resources = get_list_from_string(resources) if not AZURE_OPENAI_LOAD_BALANCING or model == AZURE_OPENAI_EMBEDDING_MODEL: return resources[0] else: start_time = time.time() async with ChainedTokenCredential( ManagedIdentityCredential(), AzureCliCredential()) as credential: async with AsyncCosmosClient(AZURE_DB_URI, credential) as db_client: db = db_client.get_database_client(database=AZURE_DB_NAME) container = db.get_container_client('models') try: keyvalue = await container.read_item(item=model, partition_key=model) # check if there's an update in the resource list and update cache if set(keyvalue["resources"]) != set(resources): keyvalue["resources"] = resources except Exception: logging.info(f"[util__module] get_next_resource: first time execution (keyvalue store with '{model}' id does not exist, creating a new one).") keyvalue = { "id": model, "resources": resources } keyvalue = await container.create_item(body=keyvalue) resources = keyvalue["resources"] # get the first resource and move it to the end of the list resource = resources.pop(0) resources.append(resource) # update cache keyvalue["resources"] = resources await container.replace_item(item=model, body=keyvalue) response_time = round(time.time() - start_time, 2) logging.info(f"[util__module] get_next_resource: model '{model}' resource {resource}. {response_time} seconds") return resource ########################################################## # OTHER FUNCTIONS ########################################################## async def get_blocked_list(): blocked_list = [] async with ChainedTokenCredential( ManagedIdentityCredential(), AzureCliCredential()) as credential: async with AsyncCosmosClient(AZURE_DB_URI, credential) as db_client: db = db_client.get_database_client(database=AZURE_DB_NAME) container = db.get_container_client('guardrails') try: key_value = await container.read_item(item='blocked_list', partition_key='blocked_list') blocked_list = key_value["blocked_words"] blocked_list = [word.lower() for word in blocked_list] except Exception as e: logging.info(f"[util__module] get_blocked_list: no blocked words list (keyvalue store with 'blocked_list' id does not exist).") return blocked_list async def extract_text_from_html(web,session): async with session.get(web.url) as html_response: try: html_response.raise_for_status() text=await html_response.text() soup = BeautifulSoup(text, 'html.parser') for tag in soup.find_all('header'): tag.decompose() for tag in soup.find_all('footer'): tag.decompose() for tag in soup.find_all('form'): tag.decompose() # Extract visible text from the HTML texts = soup.stripped_strings visible_text = ' '.join(texts) return visible_text except Exception as e: logging.error(f"Failed to extract text from url {web.url}, using snipet from bing: {e}") return web.snippet def get_possitive_int_or_default(var, default_value): try: var = int(var) if var < 0: var = default_value except: var = default_value return var