demo-python/code/custom-vectorizer/scripts/setup_search_service.py (201 lines of code) (raw):

from azure.core.exceptions import ResourceNotFoundError from azure.core.pipeline.policies import HTTPPolicy from azure.identity import DefaultAzureCredential from azure.mgmt.web import WebSiteManagementClient from azure.storage.blob import BlobServiceClient from azure.search.documents.indexes import SearchIndexClient, SearchIndexerClient from azure.search.documents.indexes.models import ( CustomWebApiParameters, CustomVectorizer, SearchField, SearchFieldDataType, HnswAlgorithmConfiguration, VectorSearch, VectorSearchProfile, SearchIndex, SemanticConfiguration, SemanticField, SemanticPrioritizedFields, SemanticSearch, SearchIndexerDataSourceConnection, SearchIndexerDataContainer, SplitSkill, SearchIndexer, WebApiSkill, InputFieldMappingEntry, OutputFieldMappingEntry, FieldMapping, IndexProjectionMode, SearchIndexerIndexProjectionSelector, SearchIndexerIndexProjections, SearchIndexerIndexProjectionsParameters, SearchIndexerSkillset ) import os from tenacity import ( Retrying, retry_if_exception_type, wait_random_exponential, stop_after_attempt ) function_name = "GetTextEmbedding" sample_index_name = "custom-embedding-index" sample_container_name = "custom-embedding-sample-data" sample_datasource_name = "custom-embedding-datasource" sample_skillset_name = "custom-embedding-skillset" sample_indexer_name = "custom-embedding-indexer" def main(): credential = DefaultAzureCredential() search_service_name = os.environ["AZURE_SEARCH_SERVICE"] search_url = f"https://{search_service_name}.search.windows.net" search_index_client = SearchIndexClient(endpoint=search_url, credential=credential, per_call_policies=[CustomVectorizerRewritePolicy()]) search_indexer_client = SearchIndexerClient(endpoint=search_url, credential=credential) print("Uploading sample data...") upload_sample_data(credential) print("Getting function URL...") function_url = get_function_url(credential) print(f"Create or update sample index {sample_index_name}...") create_or_update_sample_index(search_index_client, function_url) print(f"Create or update sample data source {sample_datasource_name}...") create_or_update_datasource(search_indexer_client) print(f"Create or update sample skillset {sample_skillset_name}") create_or_update_skillset(search_indexer_client, function_url) print(f"Create or update sample indexer {sample_indexer_name}") create_or_update_indexer(search_indexer_client) def get_function_url(credential: DefaultAzureCredential) -> str: subscription_id = os.environ["AZURE_SUBSCRIPTION_ID"] client = WebSiteManagementClient(credential=credential, subscription_id=subscription_id) resource_group = os.environ["AZURE_API_SERVICE_RESOURCE_GROUP"] function_app_name = os.environ["AZURE_API_SERVICE"] # It's possible the function is not fully provisioned by the time this script runs # Retry fetching the function information a few times before giving up if it's not found for attempt in Retrying( retry=retry_if_exception_type(ResourceNotFoundError), wait=wait_random_exponential(min=15, max=60), stop=stop_after_attempt(5) ): with attempt: embedding_function = client.web_apps.get_function(resource_group_name=resource_group, name=function_app_name, function_name=function_name) embedding_function_keys = client.web_apps.list_function_keys(resource_group_name=resource_group, name=function_app_name, function_name=function_name) function_url_template = embedding_function.invoke_url_template function_key = embedding_function_keys.additional_properties["default"] return f"{function_url_template}?code={function_key}" def upload_sample_data(credential: DefaultAzureCredential): # Connect to Blob Storage account_url = os.environ["AZURE_STORAGE_ACCOUNT_BLOB_URL"] blob_service_client = BlobServiceClient(account_url=account_url, credential=credential) container_client = blob_service_client.get_container_client(sample_container_name) if not container_client.exists(): container_client.create_container() sample_data_directory_name = os.path.join("..", "..", "..", "data", "benefitdocs") sample_data_directory = os.path.join(os.getcwd(), sample_data_directory_name) for filename in os.listdir(sample_data_directory): with open(os.path.join(sample_data_directory, filename), "rb") as f: blob_client = container_client.get_blob_client(filename) if not blob_client.exists(): print(f"Uploading {filename}...") blob_client.upload_blob(data=f) def create_or_update_sample_index(search_index_client: SearchIndexClient, custom_vectorizer_url: str): # Create a search index fields = [ SearchField(name="parent_id", type=SearchFieldDataType.String, sortable=True, filterable=True, facetable=True), SearchField(name="title", type=SearchFieldDataType.String), SearchField(name="chunk_id", type=SearchFieldDataType.String, key=True, sortable=True, filterable=True, facetable=True, analyzer_name="keyword"), SearchField(name="chunk", type=SearchFieldDataType.String, sortable=False, filterable=False, facetable=False), SearchField(name="vector", type=SearchFieldDataType.Collection(SearchFieldDataType.Single), vector_search_dimensions=384, vector_search_profile_name="hnswProfile"), ] # Configure the vector search configuration vector_search = VectorSearch( algorithms=[ HnswAlgorithmConfiguration( name="hnsw" ) ], profiles=[ VectorSearchProfile( name="hnswProfile", algorithm_configuration_name="hnsw", vectorizer="customVectorizer", ) ], vectorizers=[ CustomVectorizer(name="customVectorizer", custom_web_api_parameters=CustomWebApiParameters(uri=custom_vectorizer_url)) ], ) semantic_config = SemanticConfiguration( name="my-semantic-config", prioritized_fields=SemanticPrioritizedFields( content_fields=[SemanticField(field_name="chunk")] ), ) # Create the semantic settings with the configuration semantic_search = SemanticSearch(configurations=[semantic_config]) # Create the search index with the semantic settings index = SearchIndex(name=sample_index_name, fields=fields, vector_search=vector_search, semantic_search=semantic_search) search_index_client.create_or_update_index(index) def create_or_update_datasource(search_indexer_client: SearchIndexerClient): storage_resource_id = os.environ["AZURE_STORAGE_ACCOUNT_ID"] data_source = SearchIndexerDataSourceConnection( name=sample_datasource_name, type="azureblob", connection_string=f"ResourceId={storage_resource_id};", container=SearchIndexerDataContainer(name=sample_container_name)) search_indexer_client.create_or_update_data_source_connection(data_source) def create_or_update_skillset(search_indexer_client: SearchIndexerClient, custom_vectorizer_url: str): split_skill = SplitSkill( description="Split skill to chunk documents", text_split_mode="pages", context="/document", maximum_page_length=300, page_overlap_length=20, inputs=[ InputFieldMappingEntry(name="text", source="/document/content"), ], outputs=[ OutputFieldMappingEntry(name="textItems", target_name="pages") ], ) embedding_skill = WebApiSkill( description="Skill to generate embeddings via a custom endpoint", context="/document/pages/*", uri=custom_vectorizer_url, inputs=[ InputFieldMappingEntry(name="text", source="/document/pages/*"), ], outputs=[ OutputFieldMappingEntry(name="vector", target_name="vector") ], ) index_projections = SearchIndexerIndexProjections( selectors=[ SearchIndexerIndexProjectionSelector( target_index_name=sample_index_name, parent_key_field_name="parent_id", source_context="/document/pages/*", mappings=[ InputFieldMappingEntry(name="chunk", source="/document/pages/*"), InputFieldMappingEntry(name="vector", source="/document/pages/*/vector"), InputFieldMappingEntry(name="title", source="/document/metadata_storage_name"), ], ), ], parameters=SearchIndexerIndexProjectionsParameters( projection_mode=IndexProjectionMode.SKIP_INDEXING_PARENT_DOCUMENTS ), ) skillset = SearchIndexerSkillset( name=sample_skillset_name, description="Skillset to chunk documents and generating embeddings", skills=[split_skill, embedding_skill], index_projections=index_projections, ) result = search_indexer_client.create_or_update_skillset(skillset) def create_or_update_indexer(search_indexer_client: SearchIndexerClient): indexer = SearchIndexer( name=sample_indexer_name, description="Indexer to index documents and generate embeddings", skillset_name=sample_skillset_name, target_index_name=sample_index_name, data_source_name=sample_datasource_name, # Map the metadata_storage_name field to the title field in the index to display the PDF title in the search results field_mappings=[FieldMapping(source_field_name="metadata_storage_name", target_field_name="title")] ) search_indexer_client.create_or_update_indexer(indexer) # Workaround required to use the preview SDK class CustomVectorizerRewritePolicy(HTTPPolicy): def send(self, request): request.http_request.body = request.http_request.body.replace('customVectorizerParameters', 'customWebApiParameters') return self.next.send(request) if __name__ == "__main__": main()