# Azure AI Search with Custom LLM Skills Demo

In this notebook, we'll demonstrate how to leverage **Azure AI Studio** models such as OpenAI's GPT-4o-mini and Microsoft's Phi35-vision to enhance your Azure AI Search capabilities. By integrating these models, you'll achieve higher index quality, leverage more data for filters and language model assistance, enable multimodal Retrieval-Augmented Generation (RAG), and enjoy the flexibility of choosing from multiple models available in the **Azure AI Studio Model Catalog**.

**This unofficial code sample was created for an Ignite 2024 demo. It's offered "as-is" and might not work for all customers and all scenarios.**

## Benefits
- **Higher Index Quality**: Achieve more accurate and comprehensive search indexes.
- **Enhanced Data Utilization**: Leverage additional data to improve filters and assist language models.
- **Multimodal RAG**: Enable Retrieval-Augmented Generation across different data types and modalities.
- **Model Flexibility**: Choose from a variety of models in the **Azure AI Studio Model Catalog** to suit your specific needs.

## Prerequisites
- üêç Python 3.9 or higher
- üîó [Azure AI Search Service](https://learn.microsoft.com/azure/search/)
- üîó [Azure AI Inference API](https://learn.microsoft.com/azure/ai-studio/ai-services/model-inference)
- üîó [Azure AI Studio Model Catalog](https://learn.microsoft.com/azure/ai-studio/how-to/model-catalog-overview)
- üîó [Phi35-Vision](https://github.com/microsoft/Phi-3CookBook/blob/main/md/01.Introduce/Phi3Family.md)
- üîó [Azure OpenAI Text Embeddings](https://learn.microsoft.com/azure/search/cognitive-search-skill-azure-openai-embedding)
- üîó [Azure AI Search Power Skills Repo](https://github.com/Azure-Samples/power-skills)


## Features Covered
- ‚úÖ Blob Storage Data Source
- ‚úÖ Azure OpenAI Text Embeddings
- ‚úÖ Integrated Vectorization in Azure AI Search
- ‚úÖ Deploying a Custom Azure Function Skill
- ‚úÖ Multimodal Retrieval-Augmented Generation (RAG)
- ‚úÖ Flexible Model Selection from Azure AI Studio Model Catalog

## Scenarios Demonstrated
1. **Image Captioning**: Generate descriptive captions for images.
2. **Document Summarization**: Create concise summaries of lengthy documents.
3. **Entity Extraction**: Extract key entities from documents using custom skills for index augmentation and enrichment.

Let's get started!

## Install required libraries

In [2]:
! pip install --quiet -r azure-ai-search-LM-custom-skill-requirements.txt

# Azure Function App Deployment Instructions

Go to https://github.com/Azure-Samples/azure-search-power-skills and select either [AzureAIStudioCustomInferenceSkill](https://github.com/Azure-Samples/azure-search-power-skills/tree/main/AzureAIStudioCustomInferenceSkill) or [AzureOpenAICustomInferenceSkill](https://github.com/Azure-Samples/azure-search-power-skills/tree/main/AzureAIStudioCustomInferenceSkill) (or both if you want to leverage multiple language models)

## Prerequisites
- Azure Functions Core Tools installed
- Azure CLI installed and authenticated
- Local function app code ready for deployment

## Creating the Function Apps

Once you have forked the Power Skills Repo, select the Power Skill you want to deploy. Follow the README.MD for the power skill to get it up and running locally. Then, create your function apps using Azure CLI:

```bash
az functionapp create \
    --resource-group <RESOURCE_GROUP> \
    --consumption-plan-location <LOCATION> \
    --runtime python \
    --runtime-version 3.11 \
    --functions-version 4 \
    --name <FUNCTION_NAME_OPENAI> \
    --os-type linux \
    --storage-account <STORAGE_ACCOUNT>
```

Replace the following placeholders with your values:
- `<RESOURCE_GROUP>`: Your Azure resource group name
- `<LOCATION>`: Azure region (e.g., eastus, westeurope)
- `<FUNCTION_NAME_OPENAI>`: Name for your new function app
- `<STORAGE_ACCOUNT>`: Storage account name to be used by the function app

## Deploying the Function Apps

Deploy your function apps using Azure Functions Core Tools:

```bash
# Deploy AI Studio Model as a Service Function App
func azure functionapp publish <FUNCTION_APP_NAME_AISTUDIO> --publish-local-settings 

# Deploy Azure OpenAI Function App
func azure functionapp publish <FUNCTION_APP_NAME_OPENAI> --publish-local-settings
```

## Important Notes
- Run each deployment command from the respective function app's root directory
- The `--publish-local-settings` flag will upload your local.settings.json configuration
- Make sure your local.settings.json contains all necessary application settings
- Verify your deployments in the Azure Portal after completion

## Verification Steps
1. Check the Azure Portal to ensure both function apps are running
2. Monitor the function logs for any deployment issues
3. Test the endpoints to verify functionality
4. Review application settings to ensure all configurations were properly uploaded


## Import Libraries

In [3]:
import os
from dotenv import load_dotenv
from azure.core.credentials import AzureKeyCredential
from azure.identity import DefaultAzureCredential
from azure.search.documents import SearchClient
from azure.search.documents.indexes import SearchIndexClient, SearchIndexerClient
from azure.search.documents.indexes.models import (
    AzureOpenAIEmbeddingSkill,
    AzureOpenAIModelName,
    AzureOpenAIVectorizer,
    AzureOpenAIVectorizerParameters,
    FieldMapping,
    HnswAlgorithmConfiguration,
    HnswParameters,
    IndexerExecutionStatus,
    InputFieldMappingEntry,
    OutputFieldMappingEntry,
    SearchField,
    SearchableField,
    SearchFieldDataType,
    SearchIndex,
    SearchIndexer,
    SearchIndexerDataContainer,
    SearchIndexerDataSourceConnection,
    SearchIndexerIndexProjectionSelector,
    SearchIndexerIndexProjectionsParameters,
    SearchIndexerSkillset,
    SimpleField,
    SplitSkill,
    VectorSearch,
    VectorSearchAlgorithmMetric,
    VectorSearchProfile,
)
from azure.search.documents.models import (
    VectorizableTextQuery,
)
from azure.search.documents.indexes.models import (
    IndexingParameters,
    IndexingParametersConfiguration,
    SearchIndexerDataSourceConnection,
    SearchIndexerDataContainer,
    SearchIndexerDataSourceType
)
from azure.search.documents.indexes import SearchIndexerClient
from azure.core.credentials import AzureKeyCredential

from azure.search.documents.indexes import SearchIndexerClient
from azure.search.documents.indexes.models import (
    SearchIndexerSkillset, WebApiSkill, AzureMachineLearningSkill,
    SplitSkill, MergeSkill, InputFieldMappingEntry, OutputFieldMappingEntry,
    SearchIndexerIndexProjectionSelector,
    SearchIndexerIndexProjection,
    SearchIndexerIndexProjectionsParameters, IndexProjectionMode
)
from azure.identity import DefaultAzureCredential
from dotenv import load_dotenv

In [None]:
# Load environment variables
load_dotenv()

# Azure AI Studio PHI Configuration
AZURE_AI_STUDIO_PHI_3_API_KEY = os.getenv("AZURE_AI_STUDIO_PHI_3_API_KEY")
AZURE_AI_STUDIO_PHI_3_ENDPOINT = os.getenv("AZURE_AI_STUDIO_PHI_3_ENDPOINT")
AZURE_OPENAI_ENDPOINT = os.getenv("AZURE_OPENAI_ENDPOINT")
AZURE_OPENAI_EMBED_ENDPOINT=os.getenv("AZURE_OPENAI_EMBED_ENDPOINT")
AZURE_OPENAI_EMBED_API_KEY=os.getenv("AZURE_OPENAI_EMBED_API_KEY")

# Index Names
INDEX_NAME="phi35-enriched-content"

# Azure Search Service Configuration
SEARCH_SERVICE_API_KEY = os.getenv("AZURE_SEARCH_ADMIN_KEY")
SEARCH_SERVICE_ENDPOINT = os.getenv("AZURE_SEARCH_SERVICE_ENDPOINT")

# Blob Storage Configuration
BLOB_CONNECTION_STRING = os.getenv("BLOB_CONNECTION_STRING")
BLOB_STORAGE_ACCOUNT_KEY = os.getenv("BLOB_STORAGE_ACCOUNT_KEY")
BLOB_CONTAINER_NAME = "mini-contoso"

# Full Custom Function App URL
CUSTOM_PHI3_FUNCTION_BASE_URL="YOUR-CUSTOM-PHI3-FUNCTION-APP-URL"
CUSTOM_AOAI_FUNCTION_BASE_URL="YOUR-CUSTOM-AOAI-FUNCTION-APP-URL"

In [24]:
# User-specified parameter
USE_AAD_FOR_SEARCH = False  

def authenticate_azure_search(api_key=None, use_aad_for_search=False):
    if use_aad_for_search:
        print("Using AAD for authentication.")
        credential = DefaultAzureCredential()
    else:
        print("Using API keys for authentication.")
        if api_key is None:
            raise ValueError("API key must be provided if not using AAD for authentication.")
        credential = AzureKeyCredential(api_key)
    return credential

azure_search_credential = authenticate_azure_search(api_key=SEARCH_SERVICE_API_KEY, use_aad_for_search=USE_AAD_FOR_SEARCH)


Using API keys for authentication.


In [31]:
# Initialize the SearchIndexerClient with a credential
indexer_client = SearchIndexerClient(SEARCH_SERVICE_ENDPOINT, azure_search_credential)

# Create or update a data source connection
container = SearchIndexerDataContainer(name=BLOB_CONTAINER_NAME)
data_source_connection = SearchIndexerDataSourceConnection(
    name=f"{INDEX_NAME}-blob",
    type=SearchIndexerDataSourceType.AZURE_BLOB,
    connection_string=BLOB_CONNECTION_STRING,
    container=container,
)
data_source = indexer_client.create_or_update_data_source_connection(data_source_connection)

print(f"Data source '{data_source.name}' created or updated")

Data source 'phi35-enriched-content-blob' created or updated


In [None]:
# Initialize the SearchIndexClient
index_client = SearchIndexClient(
    endpoint=SEARCH_SERVICE_ENDPOINT,
    credential=azure_search_credential,
)

# Define the fields to match the index.json schema
fields = [
    SearchableField(
        name="chunk_id",
        type=SearchFieldDataType.String,
        key=True,
        sortable=True,
        analyzer_name="keyword",
        filterable=True,
    ),
    SimpleField(
        name="parent_id",
        type=SearchFieldDataType.String,
        filterable=True,
        analyzer="standard.lucene",
    ),
    SearchableField(
        name="chunk", type=SearchFieldDataType.String, analyzer_name="standard.lucene"
    ),
    SearchableField(
        name="parent_summary",
        type=SearchFieldDataType.String,
        analyzer_name="standard.lucene",
    ),
    SearchableField(
        name="entities",
        collection=True,
        type=SearchFieldDataType.String,
        facetable=True,
        analyzer_name="standard.lucene",
    ),
    SearchableField(
        name="title", type=SearchFieldDataType.String, analyzer_name="standard.lucene"
    ),
    SimpleField(
        name="metadata_storage_path",
        type=SearchFieldDataType.String,
        filterable=True,
        facetable=True,
    ),
    SearchField(
        name="text_embedding",
        type=SearchFieldDataType.Collection(SearchFieldDataType.Single),
        vector_search_dimensions=1536,
        vector_search_profile_name="my-vector-profile",
        hidden=False,
    ),
]

# Define the vector search configuration
vector_search = VectorSearch(
    profiles=[
        VectorSearchProfile(
            name="my-vector-profile",
            algorithm_configuration_name="my-vector-config",
            vectorizer_name="my-vectorizer",
        )
    ],
    algorithms=[
        HnswAlgorithmConfiguration(
            name="my-vector-config",
            kind="hnsw",
            parameters=HnswParameters(metric=VectorSearchAlgorithmMetric.COSINE),
        )
    ],
    vectorizers=[
        AzureOpenAIVectorizer(
            vectorizer_name="my-vectorizer",
            parameters=AzureOpenAIVectorizerParameters(
                resource_url=AZURE_OPENAI_ENDPOINT,
                deployment_name="YOUR-EMBEDDING-DEPLOYMENT-NAME",
                api_key=AZURE_OPENAI_EMBED_API_KEY,
                model_name=AzureOpenAIModelName.TEXT_EMBEDDING_3LARGE,
            ),
        )
    ],
)


# Define the index
index = SearchIndex(
    name=f"{INDEX_NAME}-index",
    fields=fields,
    vector_search=vector_search,
)

# Create or update the index
result = index_client.create_or_update_index(index)
print(f"{result.name} created")

In [None]:
# Initialize the SearchIndexerClient
client = SearchIndexerClient(
    endpoint=SEARCH_SERVICE_ENDPOINT,
    credential=azure_search_credential,
)

def create_image_captioning_skill():
    """Custom skill for image captioning via a deployed Azure function."""
    return WebApiSkill(
        name="Image Captioning Custom Skill",
        description="Generates captions for images using a custom API",
        context="/document/normalized_images/*",
        uri=f"{CUSTOM_PHI3_FUNCTION_BASE_URL}/api/custom_skill",
        http_method="POST",
        timeout="PT1M",
        batch_size=2,
        http_headers={"scenario": "image-captioning"},
        inputs=[InputFieldMappingEntry(name="image", source="/document/normalized_images/*")],
        outputs=[OutputFieldMappingEntry(name="generative-caption", target_name="caption")]
    )

def create_merge_skill():
    """Text merge skill provided by Microsoft to merge content."""
    return MergeSkill(
        name="MSFT's Native Text Merge Skill",
        description="Merges text content and captions",
        context="/document",
        inputs=[
            InputFieldMappingEntry(name="text", source="/document/content"),
            InputFieldMappingEntry(name="itemsToInsert", source="/document/normalized_images/*/caption"),
            InputFieldMappingEntry(name="offsets", source="/document/normalized_images/*/contentOffset")
        ],
        outputs=[OutputFieldMappingEntry(name="mergedText", target_name="merged_content")],
        insert_pre_tag=" ",
        insert_post_tag=" "
    )

def create_text_summarization_skill():
    """Custom skill for text summarization using a custom API."""
    return WebApiSkill(
        name="Text Summarization Custom Skill",
        description="Summarizes merged content using a custom model",
        context="/document/merged_content",
        uri=f"{CUSTOM_PHI3_FUNCTION_BASE_URL}/api/custom_skill",
        http_method="POST",
        timeout="PT1M",
        batch_size=2,
        http_headers={"scenario": "summarization"},
        inputs=[InputFieldMappingEntry(name="text", source="/document/merged_content")],
        outputs=[OutputFieldMappingEntry(name="generative-summary", target_name="summary")]
    )

def create_split_skill():
    """Skill to split merged text into chunks/pages."""
    return SplitSkill(
        name="MSFT Text Split Skill",
        description="Splits text into pages",
        context="/document/merged_content",
        text_split_mode="pages",
        maximum_page_length=512,
        page_overlap_length=20,
        default_language_code="en",
        inputs=[InputFieldMappingEntry(name="text", source="/document/merged_content")],
        outputs=[OutputFieldMappingEntry(name="textItems", target_name="pages")]
    )

def create_openai_embedding_skill():
    """Defines the embedding skill using Azure OpenAI for text embeddings."""
    return AzureOpenAIEmbeddingSkill(
        name="AOAI Embedding Skill",
        description="Generates embeddings using Azure OpenAI",
        context="/document/merged_content/pages/*",
        resource_url=AZURE_OPENAI_ENDPOINT,
        deployment_name="text-embedding-3-large",
        api_key=AZURE_OPENAI_EMBED_API_KEY,
        model_name=AzureOpenAIModelName.TEXT_EMBEDDING_3LARGE,
        inputs=[
            InputFieldMappingEntry(name="text", source="/document/merged_content/pages/*")
        ],
        outputs=[OutputFieldMappingEntry(name="embedding", target_name="text_embedding")]
    )

def create_entity_extraction_skill():
    """Custom skill for entity extraction using a custom API."""
    return WebApiSkill(
        name="Entity Extraction Custom Skill",
        description="Extracts entities using a custom model",
        context="/document/merged_content/pages/*",
        uri=f"{CUSTOM_PHI3_FUNCTION_BASE_URL}/api/custom_skill",
        http_method="POST",
        timeout="PT1M",
        batch_size=2,
        http_headers={"scenario": "entity-recognition"},
        inputs=[InputFieldMappingEntry(name="text", source="/document/merged_content/pages/*")],
        outputs=[OutputFieldMappingEntry(name="entities", target_name="entities")]
    )

# Create the skillset with all skills
def create_skillset(client, skillset_name):
    skillset = SearchIndexerSkillset(
        name=skillset_name,
        description="Skillset to chunk documents, use language models to enrich my index, and generate embeddings",
        skills=[
            create_image_captioning_skill(),
            create_merge_skill(),
            create_text_summarization_skill(),
            create_split_skill(),
            create_openai_embedding_skill(),
            create_entity_extraction_skill()
        ],
        index_projection=SearchIndexerIndexProjection(
            selectors=[
                SearchIndexerIndexProjectionSelector(
                    target_index_name=f"{INDEX_NAME}-index",
                    parent_key_field_name="parent_id",
                    source_context="/document/merged_content/pages/*",
                    mappings=[
                        InputFieldMappingEntry(name="text_embedding", source="/document/merged_content/pages/*/text_embedding"),
                        InputFieldMappingEntry(name="chunk", source="/document/merged_content/pages/*"),
                        InputFieldMappingEntry(name="parent_summary", source="/document/merged_content/summary"),
                        InputFieldMappingEntry(name="entities", source="/document/merged_content/pages/*/entities"),
                        InputFieldMappingEntry(name="title", source="/document/title"),
                        InputFieldMappingEntry(name="metadata_storage_path",source="/document/metadata_storage_path")
                    ]
                )
            ],
            parameters=SearchIndexerIndexProjectionsParameters(projection_mode=IndexProjectionMode.SKIP_INDEXING_PARENT_DOCUMENTS)
        )
    )
    try:
        client.create_or_update_skillset(skillset)
        print(f"{skillset.name} created or updated successfully")
    except Exception as e:
        print(f"Failed to create or update skillset {skillset_name}: {e}")

# Usage example
skillset_name = f"{INDEX_NAME}-skillset"
create_skillset(client, skillset_name)


In [None]:
# Initialize the SearchIndexerClient
indexer_client = SearchIndexerClient(
    endpoint=SEARCH_SERVICE_ENDPOINT,
    credential=azure_search_credential,
)

def create_and_run_indexer(
    indexer_client, indexer_name, skillset_name, index_name, data_source_name
):
    """
    Creates and runs an indexer to index documents with embeddings.
    """
    try:
        # Define the indexer with necessary parameters
        indexer = SearchIndexer(
            name=indexer_name,
            description="Indexer for Ignite Demo with OpenAI Embeddings",
            skillset_name=skillset_name,
            target_index_name=index_name,
            data_source_name=data_source_name,
            parameters=IndexingParameters(
                configuration=IndexingParametersConfiguration(
                    data_to_extract="contentAndMetadata",
                    parsing_mode="default",
                    image_action="generateNormalizedImages",
                    query_timeout=None,
                ),
            ),
            field_mappings=[
                FieldMapping(
                    source_field_name="metadata_storage_name", target_field_name="title"
                ),
                FieldMapping(
                    source_field_name="metadata_storage_path",
                    target_field_name="metadata_storage_path",
                ),
            ],
        )

        # Create or update the indexer
        indexer_client.create_or_update_indexer(indexer)
        print(f"{indexer_name} created or updated successfully.")

        # Run the indexer
        indexer_client.run_indexer(indexer_name)
        print(f"{indexer_name} is running. Please wait for indexing to complete.")

    except Exception as e:
        print(f"Failed to create or run indexer {indexer_name}: {e}")


# Main workflow
data_source_name = f"{INDEX_NAME}-blob"
indexer_name = f"{INDEX_NAME}-indexer"
skillset_name = f"{INDEX_NAME}-skillset"

create_and_run_indexer(
    indexer_client, indexer_name, skillset_name, f"{INDEX_NAME}-index", data_source_name
)

In [None]:
indexer_last_result = indexer_client.get_indexer_status(indexer_name).last_result
indexer_status = IndexerExecutionStatus.IN_PROGRESS if indexer_last_result is None  else indexer_last_result.status

while(indexer_status == IndexerExecutionStatus.IN_PROGRESS):
    indexer_last_result = indexer_client.get_indexer_status(indexer_name).last_result
    indexer_status = IndexerExecutionStatus.IN_PROGRESS if indexer_last_result is None  else indexer_last_result.status
    print(f"Indexer '{indexer_name}' is still running. Current status: '{indexer_status}'.")

print(f"Indexer '{indexer_name}' finished with status '{indexer_status}'.")

In [None]:
# Hybrid Search
query = "What are Contoso's goals for migrating to the cloud?"  

search_client = SearchClient(SEARCH_SERVICE_ENDPOINT, f"{INDEX_NAME}-index", azure_search_credential)
vector_query = VectorizableTextQuery(text=query, k_nearest_neighbors=1, fields="text_embedding")
  
results = search_client.search(  
    search_text=query,  
    vector_queries= [vector_query],
    select=["title", "chunk", "metadata_storage_path"],
    top=1
)  
  
for result in results:  
    print(f"Title: {result['title']}")  
    print(f"Content: {result['chunk']}")  
    print(f"Score: {result['@search.score']}")  
    print(f"Path: {result['metadata_storage_path']}")