demo-python/code/phi-chat/scripts/setup.py (231 lines of code) (raw):

import asyncio from azure.identity.aio import DefaultAzureCredential from azure.storage.blob.aio import BlobServiceClient from azure.search.documents.indexes.aio import SearchIndexClient, SearchIndexerClient from azure.search.documents.indexes.models import ( SearchIndex, SearchIndexerSkillset, SearchIndexer, SearchIndexerDataSourceConnection, SearchIndexerDataContainer, SearchIndexerDataSourceType, SearchIndexerIndexProjections, SearchIndexerIndexProjectionSelector, SearchIndexerIndexProjectionsParameters, IndexProjectionMode, AzureOpenAIEmbeddingSkill, SplitSkill, InputFieldMappingEntry, OutputFieldMappingEntry, SearchFieldDataType, ScalarQuantizationCompressionConfiguration, VectorSearchProfile, VectorSearch, SearchField, SearchableField, SimpleField, AzureOpenAIVectorizer, AzureOpenAIParameters, HnswAlgorithmConfiguration, LexicalAnalyzerName, SemanticSearch, SemanticConfiguration, SemanticField, SemanticPrioritizedFields ) import os import glob current_file_directory = os.path.dirname(os.path.abspath(__file__)) samples_path = os.path.join(current_file_directory, "..", "..", "..", "..", "data", "benefitdocs") async def main(): async with DefaultAzureCredential() as credential: print("Uploading sample documents...") blob_url = os.getenv("AZURE_STORAGE_ACCOUNT_BLOB_URL") async with BlobServiceClient(account_url=blob_url, credential=credential) as blob_service_client: await upload_documents(blob_service_client) print("Creating index...") search_endpoint = os.getenv("AZURE_SEARCH_ENDPOINT") async with SearchIndexClient(endpoint=search_endpoint, credential=credential) as search_index_client: await create_index(search_index_client) async with SearchIndexerClient(endpoint=search_endpoint, credential=credential) as search_indexer_client: print("Creating skillset...") await create_skillset(search_indexer_client) print("Creating datasource...") await create_datasource(search_indexer_client) print("Creating indexer...") await create_indexer(search_indexer_client) print("Done") async def upload_documents(blob_service_client: BlobServiceClient): container_client = blob_service_client.get_container_client(os.getenv("AZURE_STORAGE_CONTAINER")) image_paths = glob.glob(os.path.join(samples_path, "*.pdf")) for image_path in image_paths: async with container_client.get_blob_client(os.path.basename(image_path)) as blob_client: if not await blob_client.exists(): with open(image_path, "rb") as data: await blob_client.upload_blob(data=data) async def create_index(search_index_client: SearchIndexClient): index = SearchIndex( name=os.getenv("AZURE_SEARCH_INDEX"), fields=[ SearchableField( name="id", type=SearchFieldDataType.String, key=True, filterable=True, analyzer_name=LexicalAnalyzerName.KEYWORD ), SearchableField( name="document_id", type=SearchFieldDataType.String, key=False, filterable=True, analyzer_name=LexicalAnalyzerName.KEYWORD ), SearchField( name="embedding", type=SearchFieldDataType.Collection(SearchFieldDataType.Single), searchable=True, stored=False, vector_search_dimensions=int(os.getenv("AZURE_OPENAI_EMB_MODEL_DIMENSIONS")), vector_search_profile_name="approximateProfile" ), SearchField( name="content", type=SearchFieldDataType.String, searchable=True, filterable=False, facetable=False, sortable=False ), SimpleField( name="metadata_storage_path", type=SearchFieldDataType.String, filterable=True ), SimpleField( name="metadata_storage_name", type=SearchFieldDataType.String, filterable=True ) ], vector_search=VectorSearch( profiles=[ VectorSearchProfile( name="approximateProfile", algorithm_configuration_name="approximateConfiguration", vectorizer="text-embedding", compression_configuration_name="scalarQuantization" ) ], algorithms=[ HnswAlgorithmConfiguration(name="approximateConfiguration") ], vectorizers=[ AzureOpenAIVectorizer( name="text-embedding", azure_open_ai_parameters=AzureOpenAIParameters( model_name=os.getenv("AZURE_OPENAI_EMB_MODEL_NAME"), deployment_id=os.getenv("AZURE_OPENAI_EMB_DEPLOYMENT"), resource_uri=os.getenv("AZURE_OPENAI_ENDPOINT"), api_key=None ) ) ], compressions=[ ScalarQuantizationCompressionConfiguration(name="scalarQuantization") ] ), semantic_search=SemanticSearch( default_configuration_name="semantic-config", configurations=[ SemanticConfiguration( name="semantic-config", prioritized_fields=SemanticPrioritizedFields( title_field=SemanticField(field_name="metadata_storage_name"), content_fields=[SemanticField(field_name="content")] ) ) ] ) ) await search_index_client.create_or_update_index(index) async def create_skillset(search_indexer_client: SearchIndexerClient): skillset = SearchIndexerSkillset( name=os.getenv("AZURE_SEARCH_SKILLSET"), skills=[ AzureOpenAIEmbeddingSkill( description="Skill to generate embeddings via Azure OpenAI", context="/document/pages/*", model_name=os.getenv("AZURE_OPENAI_EMB_MODEL_NAME"), resource_uri=os.getenv("AZURE_OPENAI_ENDPOINT"), deployment_id=os.getenv("AZURE_OPENAI_EMB_DEPLOYMENT"), api_key=None, inputs=[ InputFieldMappingEntry(name="text", source="/document/pages/*"), ], outputs=[ OutputFieldMappingEntry(name="embedding") ] ), SplitSkill( description="Split skill to chunk documents", text_split_mode="pages", context="/document", maximum_page_length=2000, page_overlap_length=500, inputs=[ InputFieldMappingEntry(name="text", source="/document/content"), ], outputs=[ OutputFieldMappingEntry(name="textItems", target_name="pages") ], ) ], index_projections=SearchIndexerIndexProjections( selectors=[ SearchIndexerIndexProjectionSelector( target_index_name=os.getenv("AZURE_SEARCH_INDEX"), parent_key_field_name="document_id", source_context="/document/pages/*", mappings=[ InputFieldMappingEntry( name="embedding", source="/document/pages/*/embedding" ), InputFieldMappingEntry( name="content", source="/document/pages/*" ), InputFieldMappingEntry( name="metadata_storage_path", source="/document/metadata_storage_path" ), InputFieldMappingEntry( name="metadata_storage_name", source="/document/metadata_storage_name" ) ] ) ], parameters=SearchIndexerIndexProjectionsParameters(projection_mode=IndexProjectionMode.SKIP_INDEXING_PARENT_DOCUMENTS) ) ) await search_indexer_client.create_or_update_skillset(skillset) async def create_datasource(search_indexer_client: SearchIndexerClient): datasource = SearchIndexerDataSourceConnection( name=os.getenv("AZURE_SEARCH_DATASOURCE"), type=SearchIndexerDataSourceType.AZURE_BLOB, connection_string=f"ResourceId={os.getenv('AZURE_STORAGE_ACCOUNT_ID')}", container=SearchIndexerDataContainer(name=os.getenv("AZURE_STORAGE_CONTAINER")) ) await search_indexer_client.create_or_update_data_source_connection(datasource) async def create_indexer(search_indexer_client: SearchIndexerClient): indexer = SearchIndexer( name=os.getenv("AZURE_SEARCH_INDEXER"), data_source_name=os.getenv("AZURE_SEARCH_DATASOURCE"), target_index_name=os.getenv("AZURE_SEARCH_INDEX"), skillset_name=os.getenv("AZURE_SEARCH_SKILLSET") ) await search_indexer_client.create_or_update_indexer(indexer) if __name__ == "__main__": asyncio.run(main())