connectors/aoai.py (103 lines of code) (raw):
import logging
import os
import tiktoken
import time
from openai import AzureOpenAI, RateLimitError
from azure.identity import ManagedIdentityCredential, AzureCliCredential, ChainedTokenCredential, get_bearer_token_provider
MAX_RETRIES = 10 # Maximum number of retries for rate limit errors
MAX_EMBEDDINGS_MODEL_INPUT_TOKENS = 8192
MAX_GPT_MODEL_INPUT_TOKENS = 128000 # this is gpt4o max input, if using gpt35turbo use 16385
class AzureOpenAIClient:
"""
AzureOpenAIClient uses the OpenAI SDK's built-in retry mechanism with exponential backoff.
The number of retries is controlled by the MAX_RETRIES environment variable.
Delays between retries start at 0.5 seconds, doubling up to 8 seconds.
If a rate limit error occurs after retries, the client will retry once more after the retry-after-ms header duration (if the header is present).
"""
def __init__(self):
"""
Initializes the AzureOpenAI client.
"""
self.openai_service_name = os.getenv('AZURE_OPENAI_RESOURCE')
self.openai_api_base = f"https://{self.openai_service_name}.openai.azure.com"
self.openai_api_version = os.getenv('AZURE_OPENAI_API_VERSION')
token_provider = get_bearer_token_provider(
ChainedTokenCredential(
ManagedIdentityCredential(),
AzureCliCredential()
), "https://cognitiveservices.azure.com/.default"
)
self.client = AzureOpenAI(
api_version=self.openai_api_version,
azure_endpoint=self.openai_api_base,
azure_ad_token_provider=token_provider,
max_retries=MAX_RETRIES
)
def get_completion(self, prompt, max_tokens=800, retry_after=True):
one_liner_prompt = prompt.replace('\n', ' ')
logging.info(f"[aoai] Getting completion for prompt: {one_liner_prompt[:100]}")
openai_deployment = os.getenv('AZURE_OPENAI_CHATGPT_DEPLOYMENT')
# truncate prompt if needed
prompt = self._truncate_input(prompt, MAX_GPT_MODEL_INPUT_TOKENS)
try:
input_messages = [
{"role": "system", "content": "You are a helpful assistant."},
{"role": "user", "content": f"{prompt}"}
]
response = self.client.chat.completions.create(
messages=input_messages,
model=openai_deployment,
temperature=float(os.environ.get('AZURE_OPENAI_TEMPERATURE', 0.7)),
top_p=float(os.environ.get('AZURE_OPENAI_TOP_P', 0.95)),
max_tokens=max_tokens
)
completion = response.choices[0].message.content
return completion
except RateLimitError as e:
retry_after_ms = e.response.headers.get('retry-after-ms')
if retry_after_ms:
retry_after_ms = int(retry_after_ms)
logging.info(f"[aoai] get_completion: Reached rate limit, retrying after {retry_after_ms} ms")
time.sleep(retry_after_ms / 1000)
return self.get_completion(self, prompt, retry_after=False)
else:
logging.error(f"[aoai] get_completion: Rate limit error occurred, no 'retry-after-ms' provided: {e}")
raise
except Exception as e:
logging.error(f"[aoai] get_completion: An unexpected error occurred: {e}")
raise
def get_embeddings(self, text, retry_after=True):
one_liner_text = text.replace('\n', ' ')
logging.info(f"[aoai] Getting embeddings for text: {one_liner_text[:100]}")
openai_deployment = os.getenv('AZURE_OPENAI_EMBEDDING_DEPLOYMENT')
# summarize in case it is larger than the maximum input tokens
num_tokens = GptTokenEstimator().estimate_tokens(text)
if (num_tokens > MAX_EMBEDDINGS_MODEL_INPUT_TOKENS):
prompt = f"Rewrite the text to be coherent and meaningful, reducing it to {MAX_EMBEDDINGS_MODEL_INPUT_TOKENS} tokens: {text}"
text = self.get_completion(prompt)
logging.info(f"[aoai] get_embeddings: rewriting text to fit in {MAX_EMBEDDINGS_MODEL_INPUT_TOKENS} tokens")
try:
response = self.client.embeddings.create(
input=text,
model=openai_deployment
)
embeddings = response.data[0].embedding
return embeddings
except RateLimitError as e:
retry_after_ms = e.response.headers.get('retry-after-ms')
if retry_after_ms:
retry_after_ms = int(retry_after_ms)
logging.info(f"[aoai ]get_completion: Reached rate limit, retrying after {retry_after_ms} ms")
time.sleep(retry_after_ms / 1000)
return self.get_completion(self, prompt, retry_after=False)
else:
logging.error(f"[aoai] get_completion: Rate limit error occurred, no 'retry-after-ms' provided: {e}")
raise
except Exception as e:
logging.error(f"[aoai] get_embedding: An unexpected error occurred: {e}")
raise
def _truncate_input(self, text, max_tokens):
input_tokens = GptTokenEstimator().estimate_tokens(text)
if input_tokens > max_tokens:
logging.info(f"[aoai] Input size {input_tokens} exceeded maximum token limit {max_tokens}, truncating...")
step_size = 1 # Initial step size
iteration = 0 # Iteration counter
while GptTokenEstimator().estimate_tokens(text) > max_tokens:
text = text[:-step_size]
iteration += 1
# Increase step size exponentially every 5 iterations
if iteration % 5 == 0:
step_size = min(step_size * 2, 100)
return text
class GptTokenEstimator():
GPT2_TOKENIZER = tiktoken.get_encoding("gpt2")
def estimate_tokens(self, text: str) -> int:
return len(self.GPT2_TOKENIZER.encode(text))