Autogen_v0.4/rag_agent/search_helper.py (382 lines of code) (raw):
from azure.search.documents import SearchClient
from azure.search.documents.indexes import SearchIndexClient
from azure.search.documents.indexes.models import (
SimpleField,
SearchFieldDataType,
SearchableField,
SearchField,
VectorSearch,
HnswAlgorithmConfiguration,
VectorSearchProfile,
SemanticConfiguration,
SemanticPrioritizedFields,
SemanticField,
SemanticSearch,
SearchIndex,
AzureOpenAIVectorizer,
AzureOpenAIParameters
)
import json
from dotenv import load_dotenv
from pathlib import Path
import os
import requests
from azure.ai.documentintelligence import DocumentIntelligenceClient
from azure.search.documents.models import VectorizableTextQuery
from dotenv import load_dotenv
from azure.identity import DefaultAzureCredential
from azure.core.credentials import AzureKeyCredential
import os
import openai_helper
from datetime import datetime
import uuid
from typing import List
from dotenv import load_dotenv
load_dotenv()
# The following variables from your .env file are used in this notebook
azure_search_endpoint = os.environ["AZURE_SEARCH_ENDPOINT"]
credential = AzureKeyCredential(os.getenv("AZURE_SEARCH_API_KEY", "")) if len(os.getenv("AZURE_SEARCH_API_KEY", "")) > 0 else DefaultAzureCredential()
index_name = "aml_index_2" #os.getenv("AZURE_SEARCH_INDEX", "vectest")
azure_openai_endpoint = os.getenv("AZURE_OPENAI_ENDPOINT","")
azure_openai_key = os.getenv("AZURE_OPENAI_KEY", "") if len(os.getenv("AZURE_OPENAI_KEY", "")) > 0 else None
azure_openai_deployment_name = os.getenv("AZURE_OPENAI_DEPLOYMENT_NAME", "gpt-4o")
azure_openai_embedding__large_deployment = os.getenv("AZURE_OPENAI_3_LARGE_EMBEDDING_DEPLOYMENT", "text-embedding-3-large")
azure_openai_embedding__small_deployment = os.getenv("AZURE_OPENAI_3_LARGE_EMBEDDING_DEPLOYMENT", "text-embedding-3-small")
azure_openai_embedding_large_dimensions = int(os.getenv("AZURE_OPENAI_EMBEDDING_LARGE_DIMENSIONS", 3072))
azure_openai_embedding_small_dimensions = int(os.getenv("AZURE_OPENAI_EMBEDDING_SMALLDIMENSIONS", 1536))
embedding_model_name = os.getenv("AZURE_OPENAI_3_LARGE_EMBEDDING_DEPLOYMENT", "text-embedding-3-large")
azure_openai_api_version = os.getenv("AZURE_OPENAI_API_VERSION", "2024-06-01")
azure_document_intelligence_endpoint = os.getenv("AZURE_DOCUMENT_INTELLIGENCE_ENDPOINT", "https://document-intelligence.api.cognitive.microsoft.com/")
azure_document_intelligence_key = os.getenv("AZURE_DOCUMENT_INTELLIGENCE_KEY", "")
# print all the above
#print(azure_search_endpoint)
#print(credential)
#print(index_name)
#print(azure_openai_endpoint)
#print(azure_openai_embedding__large_deployment)
#print(azure_openai_embedding_large_dimensions)
#print(azure_openai_embedding_small_dimensions)
#print(embedding_model_name)
#print(azure_openai_api_version)
#print(azure_document_intelligence_endpoint)
doc_intelli_credential = AzureKeyCredential(azure_document_intelligence_key)
document_intelligence_client = DocumentIntelligenceClient(azure_document_intelligence_endpoint, doc_intelli_credential)
index_client = SearchIndexClient(
endpoint=azure_search_endpoint, credential=credential)
def create_index(index_name: str, analyzer_name: str = "en.microsoft", language_suffix: str = "en"):
index_schema = {
"name": index_name,
"fields": [
{
"name": "id",
"type": "Edm.String",
"key": True,
"sortable": True,
"filterable": True,
"facetable": True
},
{
"name": "docName",
"type": "Edm.String",
"searchable": True
},
{
"name": "pageNumber",
"type": "Edm.String",
"searchable": True
},
{
"name": f"title_{language_suffix}",
"type": "Edm.String",
"analyzer": analyzer_name,
"searchable": True
},
{
"name": f"content_{language_suffix}",
"type": "Edm.String",
"analyzer": analyzer_name,
"searchable": True
},
{
"name": f"category_{language_suffix}",
"type": "Collection(Edm.String)",
"analyzer": analyzer_name,
"filterable": True,
"searchable": True
},
{
"name": f"tags_{language_suffix}",
"type": "Collection(Edm.String)",
"analyzer": analyzer_name,
"filterable": True,
"searchable": True
},
{
"name": "lastUpdated",
"type": "Edm.DateTimeOffset"
},
{
"name": "titleVector",
"type": "Collection(Edm.Single)",
"searchable": True,
"dimensions": 1536,
"vectorSearchProfile": "amlHnswProfile",
},
{
"name": "contentVector",
"type": "Collection(Edm.Single)",
"searchable": True,
"dimensions": 3072,
"vectorSearchProfile": "amlHnswProfile",
},
{
"name": "categoryVector",
"type": "Collection(Edm.Single)",
"searchable": True,
"dimensions": 1536,
"vectorSearchProfile": "amlHnswProfile",
},
{
"name": "tagsVector",
"type": "Collection(Edm.Single)",
"searchable": True,
"dimensions": 1536,
"vectorSearchProfile": "amlHnswProfile",
}
],
"scoringProfiles": [
{
"name": "tagsBoost",
"text": {
"weights": {
f"tags_{language_suffix}": 5
}
},
"functions": []
},
{
"name": "newAndLatest",
"functionAggregation": "sum",
"functions": [
{
"fieldName": "lastUpdated",
"interpolation": "quadratic",
"type": "freshness",
"boost": 10,
"freshness": {
"boostingDuration": "P365D"
}
}
]
}
],
"suggesters": [
{
"name": "sg",
"searchMode": "analyzingInfixMatching",
"sourceFields": [f"title_{language_suffix}"]
}
],
"vectorSearch": {
"algorithms": [
{
"name": "amlHnsw",
"kind": "hnsw",
"hnswParameters": {
"m": 4,
"metric": "cosine"
}
}
],
"profiles": [
{
"name": "amlHnswProfile",
"algorithm": "amlHnsw",
"vectorizer": "amlVectorizer"
}
],
"vectorizers": [
{
"name":"amlVectorizer",
"kind":"azureOpenAI",
"azureOpenAIParameters": {
"resourceUri": azure_openai_endpoint,
"deploymentId": azure_openai_embedding__large_deployment,
"modelName": embedding_model_name,
"apiKey": azure_openai_key
}
}
]
},
"semantic": {
"configurations": [
{
"name": "aml-semantic-config",
"prioritizedFields": {
"titleField": {
"fieldName": f"title_{language_suffix}"
},
"prioritizedKeywordsFields": [
{
"fieldName": f"category_{language_suffix}"
},
{
"fieldName": f"tags_{language_suffix}"
}
],
"prioritizedContentFields": [
{
"fieldName": f"content_{language_suffix}"
}
]
}
}
]
}
}
headers = {'Content-Type': 'application/json',
'api-key': os.getenv("AZURE_SEARCH_ADMIN_KEY", "") }
# Create Index
url = azure_search_endpoint + "/indexes/" + index_name + "?api-version=2024-07-01"
response = requests.get(url, headers=headers)
if response.status_code == 404:
response = requests.put(url, headers=headers, json=index_schema)
index = response.json()
print(index)
else:
print("Index already exists")
def get_document_layout(pdf_folder, doc_name):
with open(os.path.join(pdf_folder ,doc_name), "rb") as f:
poller = document_intelligence_client.begin_analyze_document(
"prebuilt-layout", analyze_request=f, content_type="application/octet-stream"
)
return poller.result()
def extract_pdf_data(pdf_folder, extract_folder):
doc_names = [os.listdir(pdf_folder)[i] for i in range(0, len(os.listdir(pdf_folder)))]
for doc_idx, doc_name in enumerate(doc_names):
# Get the document layout
document_data = []
print(f"Analyzing document: {doc_name}")
result = get_document_layout(pdf_folder, doc_name)
print(f"Layout analysis completed for document: {doc_name}")
print(f"Processing document: {doc_name}...")
for page in result.pages:
if page.lines:
page_text = ""
for line_idx, line in enumerate(page.lines):
#print(f"Line {line_idx}: {line.content}")
page_text += line.content + " "
doc_data = {
"doc_name": doc_name,
"page_number": page.page_number,
"line_number": line_idx,
"content": page_text
}
document_data.append(doc_data)
output_file_path = os.path.join(extract_folder, doc_names[doc_idx] + "-document_data.json")
with open(output_file_path, "w") as f:
json.dump(document_data, f)
def enrich_pdf_data(extracted_data_folder, output_file_name):
aml_index_data = []
system_message = """
You are an AI assitant who can extract title, topics and cateogries from a document.
You will be given a document and you need to extract the title, topics and categories from the document in json format.
Retain the language in the document while extracting the title, topics and categories.
Title: Extract the title of the document that captures the information in the document in the original document language.
Topics: Extract the topics from the document that best describe the content in the original document language.
Categories: Extract the categories from the document that best describe the content in the original document language.
Do not write ```json and ``` in your response.
json format:
{
"title": "Document Title"
"topics": ["topic1 in the do", "topic2"],
"categories": ["category1", "category2"]
}
"""
for ex_data in os.listdir(extracted_data_folder):
#print(f"Processing extracted data: {ex_data}")
with open(os.path.join(extracted_data_folder, ex_data), "r") as f:
aml_docs_json = json.loads( f.read())
print(f"Processing document: {f.name}")
for doc in aml_docs_json:
#print(f"Processing document: {doc['doc_name']}")
user_query = f"""Extract the Title, topics and categories from the document.
Document:
{doc["content"]}
"""
try:
llm_reponse =openai_helper.getOpenAIResp(user_query)
llm_json = json.loads(llm_reponse)
aml_index_item = {
"id": str(uuid.uuid4()),
"doc_name": doc["doc_name"],
"page_number": doc["page_number"],
"title": llm_json["title"],
"content": doc["content"],
"category": json.dumps(llm_json["categories"]),
"tags": json.dumps(llm_json["topics"]),
"lastupdated": str(datetime.now())
}
aml_index_data.append(aml_index_item)
except Exception as e:
with open("error.log", "a") as f:
f.write(f"Error processing document: {doc['doc_name']}, {doc['page_number']} - {e}\n")
print(f"Error processing document: {doc['doc_name']}, {doc['page_number']} - {e}")
with open(output_file_name, "w") as f:
json.dump(aml_index_data, f)
def enrich_with_embeddings(output_file_name):
with open(output_file_name, "r") as f:
aml_index_data = json.loads(f.read())
titles = []
content = []
categories = []
tags = []
for doc in aml_index_data:
titles.append(doc["title"])
content.append(doc["content"])
categories.append(doc["category"])
tags.append(doc["tags"])
batch_size = 500
for i in range(0, len(titles), batch_size):
print(f"Processing batch: {i}")
title_embeddings = openai_helper.generate_embeddings(titles[i:i+batch_size], dimensions=azure_openai_embedding_small_dimensions,
model=azure_openai_embedding__small_deployment)
content_embeddings = openai_helper.generate_embeddings(content[i:i+batch_size], dimensions=azure_openai_embedding_large_dimensions,
model=azure_openai_embedding__large_deployment)
category_embeddings = openai_helper.generate_embeddings(categories[i:i+batch_size],dimensions=azure_openai_embedding_small_dimensions,
model=azure_openai_embedding__small_deployment)
tags_embeddings = openai_helper.generate_embeddings(tags[i:i+batch_size],dimensions=azure_openai_embedding_small_dimensions, model=azure_openai_embedding__small_deployment)
for j, (title_emb, content_emb, category_emb, tag_emb) in enumerate(zip(title_embeddings,
content_embeddings,
category_embeddings,
tags_embeddings)):
aml_index_data[i+j]["titleVector"] = title_emb.embedding
aml_index_data[i+j]["contentVector"] = content_emb.embedding
aml_index_data[i+j]["categoryVector"] = category_emb.embedding
aml_index_data[i+j]["tagsVector"] = tag_emb.embedding
print(f"Embeddings generated for batch: {i}")
vector_file_name = f"{output_file_name}_with_vectors.json"
with open(vector_file_name, "w") as f:
json.dump(aml_index_data, f)
def upload_to_search(index_name, data_file, language_suffix: str = "en"):
vector_file_name = f"{data_file}_with_vectors.json"
with open(vector_file_name, "r") as f:
aml_index_data_with_vectors = json.loads(f.read())
search_client = SearchClient(endpoint=azure_search_endpoint, index_name=index_name, credential=credential)
for doc in aml_index_data_with_vectors:
last_updated = datetime.fromisoformat(doc["lastupdated"]).isoformat() + "Z"
search_doc = {
"id": doc["id"],
"docName": doc["doc_name"],
"pageNumber": str(doc["page_number"]),
f"title_{language_suffix}": doc["title"],
f"content_{language_suffix}": doc["content"],
f"category_{language_suffix}": json.loads(doc["category"]),
f"tags_{language_suffix}": json.loads(doc["tags"]),
"lastUpdated": last_updated,
"titleVector": doc["titleVector"],
"contentVector": doc["contentVector"],
"categoryVector": doc["categoryVector"],
"tagsVector": doc["tagsVector"]
}
result = search_client.upload_documents(documents=[search_doc])
print(f"{len(aml_index_data_with_vectors)} Documents uploaded to Azure Search")
def get_index_fields(index_name):
index_client = SearchIndexClient(
endpoint=azure_search_endpoint, credential=credential)
idx = index_client.get_index(index_name)
select_fields = []
vector_fields = []
for field in idx.fields:
#print(field.name)
if(field.type == SearchFieldDataType.String):
select_fields.append(field.name)
if(str.find(field.name, "Vector") > 0):
vector_fields.append(field.name)
return select_fields, vector_fields
async def retrieve_search_results(index_name: str, search_query: str, top_k: int = 3) -> str:
select_fields, vector_fields = get_index_fields(index_name)
#select_fields = ["title", "content", "category", "tags"]
search_client = SearchClient(endpoint=azure_search_endpoint, index_name=index_name, credential=credential)
#vector_query = VectorizableTextQuery(text=search_query, k_nearest_neighbors=3, fields=search_fields, exhaustive=True)
vector_queries = [VectorizableTextQuery(text=search_query, k_nearest_neighbors=top_k, fields=field) for field in vector_fields]
results = search_client.search(
search_text=search_query,
vector_queries= vector_queries,
select=select_fields,
top=top_k
)
json_results = []
for result in results:
field_results = []
for field in select_fields:
result_dict = {
field: result[field]
}
field_results.append(result_dict)
json_results.append(field_results)
print(json_results)
return f"<Context>{ json.dumps(json_results)} </Context>"