demo-python/code/embeddings/multimodal-embeddings/scripts/setup.py (192 lines of code) (raw):

import asyncio from azure.identity.aio import DefaultAzureCredential from azure.storage.blob.aio import BlobServiceClient from azure.search.documents.indexes.aio import SearchIndexClient, SearchIndexerClient from azure.search.documents.indexes.models import ( SearchIndex, SearchIndexerSkillset, SearchIndexer, SearchIndexerDataSourceConnection, SearchIndexerDataContainer, SearchIndexerDataSourceType, SearchIndexerIndexProjections, SearchIndexerIndexProjectionSelector, SearchIndexerIndexProjectionsParameters, IndexProjectionMode, IndexingParameters, IndexingParametersConfiguration, BlobIndexerImageAction, VisionVectorizeSkill, InputFieldMappingEntry, OutputFieldMappingEntry, SearchFieldDataType, ScalarQuantizationCompressionConfiguration, VectorSearchProfile, VectorSearch, SearchField, SearchableField, SimpleField, AIServicesVisionVectorizer, AIServicesVisionParameters, HnswAlgorithmConfiguration, LexicalAnalyzerName ) import os import glob import datetime current_file_directory = os.path.dirname(os.path.abspath(__file__)) samples_path = os.path.join(current_file_directory, "..", "..", "..", "..", "..", "data", "images", "apples") vision_model_version = "2023-04-15" async def main(): async with DefaultAzureCredential() as credential: print("Uploading sample images...") blob_url = os.getenv("AZURE_STORAGE_ACCOUNT_BLOB_URL") async with BlobServiceClient(account_url=blob_url, credential=credential) as blob_service_client: await upload_images(blob_service_client) print("Creating index...") search_endpoint = os.getenv("AZURE_SEARCH_ENDPOINT") async with SearchIndexClient(endpoint=search_endpoint, credential=credential) as search_index_client: await create_index(search_index_client) async with SearchIndexerClient(endpoint=search_endpoint, credential=credential) as search_indexer_client: print("Creating skillset...") await create_skillset(search_indexer_client) print("Creating datasource...") await create_datasource(search_indexer_client) print("Creating indexer...") await create_indexer(search_indexer_client) print("Done") async def upload_images(blob_service_client: BlobServiceClient): container_client = blob_service_client.get_container_client(os.getenv("AZURE_STORAGE_CONTAINER")) image_paths = glob.glob(os.path.join(samples_path, "*.jpeg")) for image_path in image_paths: async with container_client.get_blob_client(os.path.basename(image_path)) as blob_client: if not await blob_client.exists(): with open(image_path, "rb") as data: await blob_client.upload_blob(data=data) async def create_index(search_index_client: SearchIndexClient): index = SearchIndex( name=os.getenv("AZURE_SEARCH_INDEX"), fields=[ SearchableField( name="id", type=SearchFieldDataType.String, key=True, filterable=True, analyzer_name=LexicalAnalyzerName.KEYWORD ), SearchableField( name="document_id", type=SearchFieldDataType.String, key=False, filterable=True, analyzer_name=LexicalAnalyzerName.KEYWORD ), SearchField( name="embedding", type=SearchFieldDataType.Collection(SearchFieldDataType.Single), searchable=True, stored=False, vector_search_dimensions=1024, vector_search_profile_name="approximateProfile" ), SimpleField( name="metadata_storage_path", type=SearchFieldDataType.String, filterable=True ) ], vector_search=VectorSearch( profiles=[ VectorSearchProfile( name="approximateProfile", algorithm_configuration_name="approximateConfiguration", vectorizer="multimodal", compression_configuration_name="scalarQuantization" ) ], algorithms=[ HnswAlgorithmConfiguration(name="approximateConfiguration") ], vectorizers=[ AIServicesVisionVectorizer( name="multimodal", ai_services_vision_parameters=AIServicesVisionParameters( model_version=vision_model_version, resource_uri=os.getenv("AZURE_AI_SERVICES_ENDPOINT"), api_key=None ) ) ], compressions=[ ScalarQuantizationCompressionConfiguration(name="scalarQuantization") ] ) ) await search_index_client.create_or_update_index(index) async def create_skillset(search_indexer_client: SearchIndexerClient): skillset = SearchIndexerSkillset( name=os.getenv("AZURE_SEARCH_SKILLSET"), skills=[ VisionVectorizeSkill( name="visionvectorizer", context="/document/normalized_images/*", inputs=[ InputFieldMappingEntry( name="image", source="/document/normalized_images/*" ) ], outputs=[ OutputFieldMappingEntry( name="vector" ) ], model_version=vision_model_version ) ], index_projections=SearchIndexerIndexProjections( selectors=[ SearchIndexerIndexProjectionSelector( target_index_name=os.getenv("AZURE_SEARCH_INDEX"), parent_key_field_name="document_id", source_context="/document/normalized_images/*", mappings=[ InputFieldMappingEntry( name="embedding", source="/document/normalized_images/*/vector" ), InputFieldMappingEntry( name="metadata_storage_path", source="/document/metadata_storage_path" ) ] ) ], parameters=SearchIndexerIndexProjectionsParameters(projection_mode=IndexProjectionMode.SKIP_INDEXING_PARENT_DOCUMENTS) ) ) await search_indexer_client.create_or_update_skillset(skillset) async def create_datasource(search_indexer_client: SearchIndexerClient): datasource = SearchIndexerDataSourceConnection( name=os.getenv("AZURE_SEARCH_DATASOURCE"), type=SearchIndexerDataSourceType.AZURE_BLOB, connection_string=f"ResourceId={os.getenv('AZURE_STORAGE_ACCOUNT_ID')}", container=SearchIndexerDataContainer(name=os.getenv("AZURE_STORAGE_CONTAINER")) ) await search_indexer_client.create_or_update_data_source_connection(datasource) async def create_indexer(search_indexer_client: SearchIndexerClient): indexer = SearchIndexer( name=os.getenv("AZURE_SEARCH_INDEXER"), data_source_name=os.getenv("AZURE_SEARCH_DATASOURCE"), target_index_name=os.getenv("AZURE_SEARCH_INDEX"), skillset_name=os.getenv("AZURE_SEARCH_SKILLSET"), parameters=IndexingParameters( configuration=IndexingParametersConfiguration( image_action=BlobIndexerImageAction.GENERATE_NORMALIZED_IMAGES, query_timeout=None ) ) ) await search_indexer_client.create_or_update_indexer(indexer) if __name__ == "__main__": asyncio.run(main())