In [None]:
# Copyright 2024 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

# Building a Multimodal Chatbot for Warranty Claims using Gemini and Vector Search in Vertex AI

<table align="left">
  <td style="text-align: center">
    <a href="https://colab.research.google.com/github/GoogleCloudPlatform/generative-ai/blob/main/gemini/use-cases/retrieval-augmented-generation/retail_warranty_claim_chatbot.ipynb">
      <img src="https://cloud.google.com/ml-engine/images/colab-logo-32px.png" alt="Google Colaboratory logo"><br> Run in Colab
    </a>
  </td>
  <td style="text-align: center">
    <a href="https://console.cloud.google.com/vertex-ai/colab/import/https:%2F%2Fraw.githubusercontent.com%2FGoogleCloudPlatform%2Fgenerative-ai%2Fmain%2Fgemini%2Fuse-cases%2Fretrieval-augmented-generation%2Fretail_warranty_claim_chatbot.ipynb">
      <img width="32px" src="https://lh3.googleusercontent.com/JmcxdQi-qOpctIvWKgPtrzZdJJK-J3sWE1RsfjZNwshCFgE_9fULcNpuXYTilIR2hjwN" alt="Google Cloud Colab Enterprise logo"><br> Run in Colab Enterprise
    </a>
  </td>      
  <td style="text-align: center">
    <a href="https://github.com/GoogleCloudPlatform/generative-ai/blob/main/gemini/use-cases/retrieval-augmented-generation/retail_warranty_claim_chatbot.ipynb">
      <img src="https://cloud.google.com/ml-engine/images/github-logo-32px.png" alt="GitHub logo"><br> View on GitHub
    </a>
  </td>
  <td style="text-align: center">
    <a href="https://console.cloud.google.com/vertex-ai/workbench/deploy-notebook?download_url=https://raw.githubusercontent.com/GoogleCloudPlatform/generative-ai/main/gemini/use-cases/retrieval-augmented-generation/retail_warranty_claim_chatbot.ipynb">
      <img src="https://lh3.googleusercontent.com/UiNooY4LUgW_oTvpsNhPpQzsstV5W8F7rYgxgGBD85cWJoLmrOzhVs_ksK_vgx40SHs7jCqkTkCk=e14-rj-sc0xffffff-h130-w32" alt="Vertex AI logo"><br> Open in Vertex AI Workbench
    </a>
  </td>
</table>

<div style="clear: both;"></div>

<b>Share to:</b>

<a href="https://www.linkedin.com/sharing/share-offsite/?url=https%3A//github.com/GoogleCloudPlatform/generative-ai/blob/main/gemini/use-cases/retrieval-augmented-generation/retail_warranty_claim_chatbot.ipynb" target="_blank">
  <img width="20px" src="https://upload.wikimedia.org/wikipedia/commons/8/81/LinkedIn_icon.svg" alt="LinkedIn logo">
</a>

<a href="https://bsky.app/intent/compose?text=https%3A//github.com/GoogleCloudPlatform/generative-ai/blob/main/gemini/use-cases/retrieval-augmented-generation/retail_warranty_claim_chatbot.ipynb" target="_blank">
  <img width="20px" src="https://upload.wikimedia.org/wikipedia/commons/7/7a/Bluesky_Logo.svg" alt="Bluesky logo">
</a>

<a href="https://twitter.com/intent/tweet?url=https%3A//github.com/GoogleCloudPlatform/generative-ai/blob/main/gemini/use-cases/retrieval-augmented-generation/retail_warranty_claim_chatbot.ipynb" target="_blank">
  <img width="20px" src="https://upload.wikimedia.org/wikipedia/commons/5/5a/X_icon_2.svg" alt="X logo">
</a>

<a href="https://reddit.com/submit?url=https%3A//github.com/GoogleCloudPlatform/generative-ai/blob/main/gemini/use-cases/retrieval-augmented-generation/retail_warranty_claim_chatbot.ipynb" target="_blank">
  <img width="20px" src="https://redditinc.com/hubfs/Reddit%20Inc/Brand/Reddit_Logo.png" alt="Reddit logo">
</a>

<a href="https://www.facebook.com/sharer/sharer.php?u=https%3A//github.com/GoogleCloudPlatform/generative-ai/blob/main/gemini/use-cases/retrieval-augmented-generation/retail_warranty_claim_chatbot.ipynb" target="_blank">
  <img width="20px" src="https://upload.wikimedia.org/wikipedia/commons/5/51/Facebook_f_logo_%282019%29.svg" alt="Facebook logo">
</a>            

| | |
|-|-|
|Author(s) | [Zachary Thorman](https://github.com/zthor5), [Charles Elliott](https://github.com/charleselliott) |

## Overview

This notebook walks through the process to build a warranty claims chatbot that utilizes Vector Search and the Gemini API in Vertex AI in Google Cloud. For the purposes of this notebook, we will utilize a ficticious shoe startup called [AquaStride](https://storage.googleapis.com/github-repo/generative-ai/gemini/use-cases/rag/warranty-claim-chatbot/aquastride-company-overview.pdf).

 - For teaching purposes, you'll ingest the sample data by converting PDFs -> Images -> Text -> Embeddings -> Vector DB.
 - In this notebook, you will create a custom RAG implementation, deployed on Vector Search. You can also use other managed services like [Vertex AI Search](https://cloud.google.com/enterprise-search?hl=en) as a vector database.
 - We also used [Function Calling](https://cloud.google.com/vertex-ai/generative-ai/docs/multimodal/function-calling) in the Gemini API to handle driving the user intents towards their intended functions.

The sample code shown in this notebook originally appeared in the [Building out code pipelines for your Gen AI customer service app](https://www.youtube.com/live/Zm255g3URpw?feature=shared&t=2845) session at the [Google Startup School](https://startup.google.com/programs/startup-school/) on May 28th, 2024.

<img src="https://storage.googleapis.com/github-repo/generative-ai/gemini/use-cases/rag/warranty-claim-chatbot/user-flow-diagram.png" width="70%">

## Getting Started

In this section, you will install the necessary dependencies and define the Google Cloud project where you want to connect to Vertex AI.

### Install Vertex AI SDK and other required packages

In [None]:
%pip install --upgrade -q pymupdf gradio google-cloud-aiplatform langchain_google_vertexai pillow gradio regex langchain==0.1.20

### Import libraries

In [None]:
import base64
from datetime import datetime

# File system operations and displaying images
import os

## Initialize the Colab Library & sys
import sys

# Import utility functions for timing and file handling
import time

# Libraries for downloading files, data manipulation, and creating a user interface
import uuid

from PIL import Image as PIL_Image
import fitz

# Initialize Vertex AI libraries for working with generative s
from google.cloud import aiplatform
import gradio as gr

# Import LangChain components
from langchain.text_splitter import CharacterTextSplitter
from langchain_community.document_loaders import DataFrameLoader
import pandas as pd
import regex as re

# Initialize Vertex AI
import vertexai
from vertexai.generative_models import (
    FunctionDeclaration,
    GenerativeModel,
    Image,
    Part,
    Tool,
)
from vertexai.language_models import TextEmbeddingModel
import vertexai.preview.generative_models as generative_models
from vertexai.preview.generative_models import ToolConfig

### Restart runtime

To use the newly installed packages in this Jupyter runtime, you must restart the runtime. You can do this by running the cell below, which restarts the current kernel.

The restart might take a minute or longer. After it's restarted, continue to the next step.

In [None]:
import IPython

app = IPython.Application.instance()
app.kernel.do_shutdown(True)

<div class="alert alert-block alert-warning">
<b>‚ö†Ô∏è Wait for the kernel to finish restarting before you continue. ‚ö†Ô∏è</b>
</div>

### Authenticate your notebook environment (Colab only)

If you are running this notebook on Google Colab, run the cell below to authenticate your environment.

This step is not required if you are using [Vertex AI Workbench](https://cloud.google.com/vertex-ai-workbench).

In [None]:
# Additional authentication is required for Google Colab
if "google.colab" in sys.modules:
    # Authenticate user to Google Cloud
    from google.colab import auth

    auth.authenticate_user()

### Define Google Cloud project information, initialize Vertex AI, and add Secrets

To get started using Vertex AI, you must have an existing Google Cloud project and [enable the Vertex AI API](https://console.cloud.google.com/flows/enableapi?apiid=aiplatform.googleapis.com).

Learn more about [setting up a project and a development environment](https://cloud.google.com/vertex-ai/docs/start/cloud-environment).

In [None]:
# Utilizing Secrets to retrieve sensitive information
# You can add your own projectID and location to run in your environment.

PROJECT_ID = "[your-project-id]"  # @param {type:"string"}
LOCATION = "us-central1"  # @param {type:"string"}


vertexai.init(project=PROJECT_ID, location=LOCATION)

### Initializing Gemini and Text Embedding models

Here we initialize the models that will be used for embeddings & answering questions against the PDFs.

In [None]:
# Defines the Generative Models Configuration
generation_config = {
    "max_output_tokens": 8192,
    "temperature": 0,
    "top_p": 0.95,
}

# Loading Gemini Model
multimodal_model = GenerativeModel(
    "gemini-2.0-flash", generation_config=generation_config
)

# Initializing embedding model
text_embedding_model = TextEmbeddingModel.from_pretrained("text-embedding-005")

# Download backup blank file to use if needed when no results (Not Required for RAG)
! wget -O no-matching-pages.png https://storage.googleapis.com/github-repo/generative-ai/gemini/use-cases/rag/warranty-claim-chatbot/no-matching-pages.png

# Helper Functions for RAG


In this section, you will ingest sample data by converting PDFs -> Images -> Text -> Embeddings -> Vector DB.

The following cells define helper functions that will be used in the following sections. Feel free to run the group of collapsed cells at once or review at your discretion.

### Create and clean images folder

In [None]:
# Pass The folder path for storing the images


def create_clean_image_folder(Image_Path):
    # Create the directory if it doesn't exist
    if not os.path.exists(Image_Path):
        os.makedirs(Image_Path)
    image_star = Image_Path + "*"
    !rm -rf {image_star}

### Split PDF to images and extract data using Gemini

This module processes a set of images, extracting text and tabular data using a multimodal model (Gemini).
It handles potential errors, stores the extracted information in a DataFrame, and saves the results to a CSV file.

You can modify this approach in a number of ways, such as to use [Document AI](https://cloud.google.com/blog/products/ai-machine-learning/document-ai-custom-extractor-powered-by-generative-ai-is-now-ga) for OCR Parsing. Feel free to try alternatives!

In [None]:
def split_pdf_extract_data(pdfList, folder_uri):
    # To get better resolution
    zoom_x = 2.0  # horizontal zoom
    zoom_y = 2.0  # vertical zoom
    mat = fitz.Matrix(zoom_x, zoom_y)  # zoom factor 2 in each dimension

    for indiv_Pdf in pdfList:
        doc = fitz.open(indiv_Pdf)  # open document
        for page in doc:  # iterate through the pages
            pix = page.get_pixmap(matrix=mat)  # render page to an image
            outpath = f"{folder_uri}{indiv_Pdf}_{page.number}.png"
            pix.save(outpath)  # store image as a PNG

    # Define the path where images are located
    image_names = os.listdir(folder_uri)
    Max_images = len(image_names)

    # Create empty lists to store image information
    page_source = []
    page_content = []
    page_id = []

    p_id = 0  # Initialize image ID counter
    rest_count = 0  # Initialize counter for error handling

    while p_id < Max_images:
        try:
            # Construct the full path to the current image
            image_path = folder_uri + image_names[p_id]

            # Load the image
            image = Image.load_from_file(image_path)

            # Generate prompts for text and table extraction
            prompt_text = "Extract all text content in the image"
            prompt_table = (
                "Detect table in this image. Extract content maintaining the structure"
            )
            prompt_image = "Detect images in this image. Extract content in the form of alternative text or subtitles to each sub-image"

            # Extract text using your multimodal model
            contents = [image, prompt_text]
            response = multimodal_model.generate_content(contents)
            text_content = response.text

            # Extract table using your multimodal model
            contents = [image, prompt_table]
            response = multimodal_model.generate_content(contents)
            table_content = response.text

            # Extract information from images (i.e. Subtitle / Alternative text). | Currently Disabled
            # contents = [image, prompt_image]
            # response = multimodal_model.generate_content(contents)
            # image_content = response.text

            # Log progress and store results
            print(f"processed image no: {p_id}")
            page_source.append(image_path)
            page_content.append(
                text_content + "\n" + table_content
            )  # + "\n" + image_content)
            page_id.append(p_id)
            p_id += 1

        except Exception as err:
            # Handle errors during processing
            print(err)
            print("Taking Some Rest")
            time.sleep(
                12
            )  # Pause execution for 12 second due to default Quota for Vertex AI
            rest_count += 1
            if rest_count == 5:  # Limit consecutive error handling
                rest_count = 0
                print(f"Cannot process image no: {image_path}")
                p_id += 1  # Move to the next image

    # Create a DataFrame to store extracted information
    df = pd.DataFrame(
        {"page_id": page_id, "page_source": page_source, "page_content": page_content}
    )
    del page_id, page_source, page_content  # Conserve memory
    df.head()  # Preview the DataFrame

    return df

### Create the chunks and embeddings

In [None]:
def generate_text_embedding(text) -> list:
    """Text embedding with a Large Language Model."""
    embeddings = text_embedding_model.get_embeddings([text])
    vector = embeddings[0].values
    return vector

In [None]:
# Returns a chunked embeddings dataframe


def create_chunked_embeddings(df):
    # Create a DataFrameLoader to prepare data for LangChain
    loader = DataFrameLoader(df, page_content_column="page_content")

    # Load documents from the 'page_content' column of your DataFrame
    documents = loader.load()

    # Log the number of documents loaded
    print(f"# of documents loaded (pre-chunking) = {len(documents)}")

    # Create a text splitter to divide documents into smaller chunks
    text_splitter = CharacterTextSplitter(
        chunk_size=10000,  # Target size of approximately 10000 characters per chunk
        chunk_overlap=200,  # overlap between chunks
    )

    # Split the loaded documents
    doc_splits = text_splitter.split_documents(documents)

    # Add a 'chunk' ID to each document split's metadata for tracking
    for idx, split in enumerate(doc_splits):
        split.metadata["chunk"] = idx

    # Log the number of documents after splitting
    print(f"# of documents = {len(doc_splits)}")

    texts = [doc.page_content for doc in doc_splits]
    text_embeddings_list = []
    id_list = []
    page_source_list = []
    for doc in doc_splits:
        id = uuid.uuid4()
        text_embeddings_list.append(generate_text_embedding(doc.page_content))
        id_list.append(str(id))
        page_source_list.append(doc.metadata["page_source"])
        time.sleep(12)  # So that we don't run into Quota Issue

    # Creating a dataframe of ID, embeddings, page_source and text
    embedding_df = pd.DataFrame(
        {
            "id": id_list,
            "embedding": text_embeddings_list,
            "page_source": page_source_list,
            "text": texts,
        }
    )
    embedding_df.head()
    return embedding_df

### Save the embeddings in a JSON file
To load the embeddings into Vector Search, we need to save them in JSON files with JSONL format. See more information in the docs at [Input data format and structure](https://cloud.google.com/vertex-ai/docs/matching-engine/match-eng-setup/format-structure#data-file-formats).

First, export the `id` and `embedding` columns from the DataFrame in JSONL format, and save it.

Then, create a new Cloud Storage bucket and copy the file to it.

In [None]:
def create_json_file(embedding_df, RAG_unique_identifier):
    # save id and embedding as a json file
    json_file_name = RAG_unique_identifier + ".json"
    jsonl_string = embedding_df[["id", "embedding"]].to_json(
        orient="records", lines=True
    )
    with open(json_file_name, "w") as f:
        f.write(jsonl_string)

    # Show the first few lines of the json file
    #! head -n 3 {json_file_name}
    return json_file_name

In [None]:
def upload_file_to_gcs(json_file_name, bucket_location):
    # Generates a unique ID for session
    UID = datetime.now().strftime("%m%d%H%M%S")
    # Creates a GCS bucket
    BUCKET_URI = f"gs://{bucket_location}--{UID}"
    ! gsutil mb -l $LOCATION -p {PROJECT_ID} {BUCKET_URI}
    ! gsutil cp {json_file_name} {BUCKET_URI}
    return BUCKET_URI

### Create an index in Vector Search

Now it's ready to load the embeddings to Vector Search. Its APIs are available under the [aiplatform](https://cloud.google.com/python/docs/reference/aiplatform/latest/google.cloud.aiplatform) package of the SDK.

Create an [MatchingEngineIndex](https://cloud.google.com/python/docs/reference/aiplatform/latest/google.cloud.aiplatform.MatchingEngineIndex) with its `create_tree_ah_index` function (Matching Engine is the previous name of Vector Search).

In [None]:
def create_index(vec_search_index_name, bucket_location):
    return aiplatform.MatchingEngineIndex.create_tree_ah_index(
        display_name=f"{vec_search_index_name}",
        contents_delta_uri=bucket_location,
        dimensions=768,
        approximate_neighbors_count=20,
        distance_measure_type="DOT_PRODUCT_DISTANCE",
    )

By calling the `create_tree_ah_index` function, it starts building an Index. This will take under a few minutes if the dataset is small, otherwise about 50 minutes or more depending on the size of the dataset.

You can check status of the index creation on [the Vector Search Console > INDEXES tab](https://console.cloud.google.com/vertex-ai/matching-engine/indexes).



---

See [this document](https://cloud.google.com/vertex-ai/docs/vector-search/create-manage-index) for more details on creating your Index and the parameters.

### Create an index endpoint and deploy the index

To use the Index, you need to create an [Index Endpoint](https://cloud.google.com/vertex-ai/docs/vector-search/deploy-index-public). It works as a server instance accepting query requests for your Index.

You can view your public endpoints [on Google Cloud's Vertex Endpoints](https://console.cloud.google.com/vertex-ai/matching-engine/index-endpoints)

In [None]:
def create_Index_Endpoint(my_index, vec_search_index_name):
    my_index_endpoint = aiplatform.MatchingEngineIndexEndpoint.create(
        display_name=f"{vec_search_index_name}",
        public_endpoint_enabled=True,
    )

    DEPLOYED_INDEX_NAME = vec_search_index_name.replace(
        "-", "_"
    )  # Can't have '-' in deployment name, only alphanumeric and _ allowed
    UID = datetime.now().strftime("%m%d%H%M%S")
    DEPLOYED_INDEX_ID = f"{DEPLOYED_INDEX_NAME}_{UID}"
    # deploy the Index to the Index Endpoint
    my_index_endpoint.deploy_index(index=my_index, deployed_index_id=DEPLOYED_INDEX_ID)

    return my_index_endpoint, DEPLOYED_INDEX_ID

This demo utilizes a [Public Endpoint](https://cloud.google.com/vertex-ai/docs/vector-search/setup/setup#choose-endpoint) and does not support [Virtual Private Cloud (VPC)](https://cloud.google.com/vpc/docs/private-services-access). Unless you have a specific requirement for VPC, it is recommended to use a Public Endpoint.

Despite the term "public" in its name, it does not imply open access to the public internet. Without explicit IAM permissions, no one can access the endpoint.

If it is the first time to deploy an Index to an Index Endpoint, it will take around 25 minutes to automatically build and initiate the backend for it. After the first deployment, it will finish in seconds. To see the status of the index deployment, open [the Vector Search Console > INDEX ENDPOINTS tab](https://console.cloud.google.com/vertex-ai/matching-engine/index-endpoints) and click the Index Endpoint.

### Ask Questions to the PDF
This code snippet establishes a question-answering (QA) system.  It leverages a vector search engine to find relevant information from a dataset and then uses the LLM to generate and refine the final answer to a user's query.

In [None]:
def Test_LLM_Response(txt):
    """
    Determines whether a given text response generated by an LLM indicates a lack of information.

    Args:
        txt (str): The text response generated by the LLM.

    Returns:
        bool: True if the LLM's response suggests it was able to generate a meaningful answer,
              False if the response indicates it could not find relevant information.

    This function works by presenting a formatted classification prompt to the LLM (`gemini_pro_model`).
    The prompt includes the original text and specific categories indicating whether sufficient information was available.
    The function analyzes the LLM's classification output to make the determination.
    """

    classification_prompt = f""" Classify the text as one of the following categories:
        -Information Present
        -Information Not Present
        Text=The provided context does not contain information.
        Category:Information Not Present
        Text=I cannot answer this question from the provided context.
        Category:Information Not Present
        Text:{txt}
        Category:"""
    classification_response = multimodal_model.generate_content(
        classification_prompt
    ).text

    if "Not Present" in classification_response:
        return False  # Indicates that the LLM couldn't provide an answer
    else:
        return True  # Suggests the LLM generated a meaningful response


def get_prompt_text(question, context):
    """
    Generates a formatted prompt string suitable for a language model, combining the provided question and context.

    Args:
        question (str): The user's original question.
        context (str): The relevant text to be used as context for the answer.

    Returns:
        str: A formatted prompt string with placeholders for the question and context, designed to guide the language model's answer generation.
    """
    prompt = """
      Answer the question using the context below. Respond with only information from the text provided
      Question: {question}
      Context : {context}
      """.format(
        question=question, context=context
    )
    return prompt


def get_answer(
    embedding_df, my_index_endpoint, DEPLOYED_INDEX_ID, query="No Query was provided."
):
    """
    Retrieves an answer to a provided query using multimodal RAG.

    This function leverages a vector search system to find relevant text documents from a
    pre-indexed store of multimodal data. Then, it uses a large language model (LLM) to generate
    an answer, using the retrieved documents as context.

    Args:
        query (str): The user's original query.

    Returns:
        dict: A dictionary containing the following keys:
            * 'result' (str): The LLM-generated answer.
            * 'neighbor_index' (int): The index of the most relevant document used for generation
                                     (for fetching image path).

    Raises:
        RuntimeError: If no valid answer could be generated within the specified search attempts.
    """

    neighbor_index = 0  # Initialize index for tracking the most relevant document
    answer_found_flag = 0  # Flag to signal if an acceptable answer is found
    result = ""  # Initialize the answer string
    # Use a default image if the reference is not found
    page_source = "./no-matching-pages.png"  # Initialize the blank image
    query_embeddings = generate_text_embedding(
        query
    )  # Generate embeddings for the query

    response = my_index_endpoint.find_neighbors(
        deployed_index_id=DEPLOYED_INDEX_ID,
        queries=[query_embeddings],
        num_neighbors=5,
    )  # Retrieve up to 5 relevant documents from the vector store

    while answer_found_flag == 0 and neighbor_index < 4:
        context = embedding_df[
            embedding_df["id"] == response[0][neighbor_index].id
        ].text.values[
            0
        ]  # Extract text context from the relevant document

        prompt = get_prompt_text(
            query, context
        )  # Create a prompt using the question and context
        result = multimodal_model.generate_content(
            prompt
        ).text  # Generate an answer with the LLM

        if Test_LLM_Response(result):
            answer_found_flag = 1  # Exit loop when getting a valid response
        else:
            neighbor_index += (
                1  # Try the next retrieved document if the answer is unsatisfactory
            )

    if answer_found_flag == 1:
        page_source = embedding_df[
            embedding_df["id"] == response[0][neighbor_index].id
        ].page_source.values[
            0
        ]  # Extract image_path from the relevant document
    return result, page_source

# Create a RAG endpoint

In this section, you will load sample data into a Vector Search endpoint. In this example you'll be using PDFs files that contain [a company overview](https://storage.googleapis.com/github-repo/generative-ai/gemini/use-cases/rag/warranty-claim-chatbot/aquastride-company-overview.pdf) and a [list of products SKUs](https://storage.googleapis.com/github-repo/generative-ai/gemini/use-cases/rag/warranty-claim-chatbot/aquastride-sku-sn-database.pdf).

It is **recommended** for production workloads to use a managed database for improved performance and efficiency.

## Create RAG Function

In [None]:
def create_RAG(RAG_unique_identifier, rag_list_pdfs):
    # Creates a Unique folder for the segmented PDF images. (Each page of the PDF is converted into a .PNG)
    folder_url = f"./{RAG_unique_identifier}_images/"
    create_clean_image_folder(folder_url)

    # Creates the embeddings dataframe of the PDF Images.
    company_dataframe = split_pdf_extract_data(rag_list_pdfs, folder_url)
    company_embeddings_dataframe = create_chunked_embeddings(company_dataframe)

    # Creates unique names for the Google Cloud Vector Search & GCS Bucket URL.
    vec_search_index_name = f"vec-search-index-{RAG_unique_identifier}"
    bucket_name = f"vec-search-bucket-{RAG_unique_identifier}"

    # Uploads the embeddings to GCS as a JSON file.
    json_file_name = create_json_file(
        company_embeddings_dataframe, RAG_unique_identifier
    )
    bucket_location = upload_file_to_gcs(json_file_name, bucket_name)

    # This function may take up to 25 minutes to run to deploy the custom Vector Search to a Public Endpoint.
    index = create_index(vec_search_index_name, bucket_location)
    my_index_endpoint, index_id = create_Index_Endpoint(index, vec_search_index_name)

    # Create a reusable Object for each Rag Model to call upon
    RAG_model_info = {
        "bucket_uri": bucket_location,
        "index": index,
        "embeddings_dataframe": company_embeddings_dataframe,
        "index_id": index_id,
        "my_index_endpoint": my_index_endpoint,
    }

    return RAG_model_info

## Testing the RAG performance

In [None]:
# Download your PDFs here using the wget command.
! wget -q -O aquastride_company.pdf 'https://storage.googleapis.com/github-repo/generative-ai/gemini/use-cases/rag/warranty-claim-chatbot/aquastride-company-overview.pdf'
! wget -q -O aquastride_DB.pdf 'https://storage.googleapis.com/github-repo/generative-ai/gemini/use-cases/rag/warranty-claim-chatbot/aquastride-sku-sn-database.pdf'

# Needs to be lowercase characters with no spaces; e.g. "test", "aquastride".
RAG_unique_identifier = "aquastride"  # @param {type: "string"}

# List the PDFs to be processed via the RAG Endpoint.
pdf_list = ["aquastride_company.pdf", "aquastride_DB.pdf"]

# Creates the RAG model endpoint on Vertex AI Vector Search.
rag_info = create_RAG(RAG_unique_identifier, pdf_list)

In [None]:
# Provide a Query to test the deployed endpoint.
# Highly recommended to use a call to a Database (i.e. Cloud SQL) with the extracted Serial number.
query = "Provided the Serial_No (CZE5F6G7) and SKU (DepthStrider_23_Red_Norm), Determine the cx_name who purchased this serial number.\\n Output the Owner (cx_name) and the address (cx_address) in this format: \\nThank you [cx_name] for your purchase! We have you on file at [cx_address]."  # @param {type: "string"}

# Responds with the result of the query against the RAG endpoint & its source.
result, page_source = get_answer(
    rag_info["embeddings_dataframe"],
    rag_info["my_index_endpoint"],
    rag_info["index_id"],
    query,
)

# If the endpoint returns irrelevant context to the LLM, respond with the below.
if page_source == "./no-matching-pages.png":
    result = (
        "I could not find your answer within the Data. Can you rephrase your question?"
    )

# Print the results and it's page source.
print(f"Response: {result}\nPage Source: {page_source}")

# Implement application logic and function calling

In this section, you will implement logic with [Gemini Function Calling](https://cloud.google.com/vertex-ai/generative-ai/docs/multimodal/function-calling) to handle tasks related to warranty claim support such as extracting information from images of shoe tags or inspecting pictures of shoes for physical damage.

## Initialize and configure the Gemini model

In [None]:
image_determination_prompt = """
You will be provided with an image. Analyze the image and perform the following:

**Image Classification:**

* **Shoe Tag:** If the image is a shoe tag, extract the following information and generate a corresponding JSON schema:

    ```json
    {
        "brand": "[Brand Name/Website]",
        "product": "[Product Name/SKU]",
        "serialNumber": "[Serial Number]",
        "sizing": {
            "us": "[US Size]",
            "uk": "[UK Size]",
            "eur": "[EUR Size]",
            "chn": "[CHN Size]"
        },
        "madeIn": "[Manufacturing Location]"
    }
    ```

* **Damaged Shoe:** If the image shows a shoe with visible damage, assess the damage and generate a JSON schema for damage reporting:

    ```json
    {
        "damagedAreas": ["[Area 1]", "[Area 2]", ...],
        "damageType": "[Damage Type]",
        "severity": "[Severity Level]",
        "additionalNotes": "[Optional Additional Notes]"
    }
    ```

* **Other:** If the image is neither a shoe tag nor a damaged shoe, respond with: "I am unable to help you with that image because it does not help with warranty evaluations."

**Damage Assessment (if applicable):**

* **Identify Damaged Areas:** Pinpoint the specific locations of the damage on the shoe (e.g., sole, upper, laces).
* **Describe Damage Type:** Classify the type of damage (e.g., wear and tear, tear, stain, discoloration, structural damage).
* **Assess Severity:** Estimate the severity of the damage (e.g., minor, moderate, severe).

**Additional Notes:**

* **Clarity and Detail:** Be as specific as possible when describing the damage.
* **Image Quality:** If the image quality is too poor to assess the damage or extract information, indicate this in the output.
* **Human Intervention:** For complex or ambiguous cases, suggest that the customer contact a human agent for further assistance.
* **Missing Data Handling:** If any piece of information is not present, include the corresponding key in the JSON schema but leave the value as an empty string ("").
"""

system_prompt = """
**Persona:** You are Bubbles, AquaStride's friendly and helpful AI assistant, here to help with warranty claims. Your tone is positive and upbeat, but also efficient and clear.

**ReACT Framework:**

**Remember:** Keep track of the conversation history to know which step the customer is on.
**Evaluate:** Based on the customer's response, determine if they have provided the necessary information to move to the next step.
**Act:** Provide the appropriate response:
If the customer provides the required information, move to the next step.
If the customer is missing information, politely prompt them again.
If the customer is struggling, offer alternative solutions like contacting the call center.
**Confirm:** Before moving to the next step, ensure the customer understands and is ready to proceed.

**Return Process Dialogue Flow:**

**Step 1: Introduction and Explanation**

> Hey there! üëã I'm Bubbles, your friendly AquaStride assistant! It sounds like you might need to make a warranty claim on a pair of our awesome shoes. That's no problem, I'm here to help you dive right into the process! üåä First things first, could you please share a picture of the inner shoe tag? This helps us quickly identify your shoes and get started. üëç

**Step 2: Image Upload and Verification (Secret Step)**

> (Upon receiving the image, extract the SKU, Serial Number, and Manufacturing Date. Verify this information against the customer database to confirm the purchase was from a legitimate retailer and check for existing customer details.)

**Step 3: Purchase Detail Confirmation**

> Thanks for sharing that! üòä Based on the tag information, it looks like these shoes were purchased on [Date] from [Retailer/Website]. Is that correct? Please confirm your full name and email address associated with the purchase so we can access your information quickly.

**Step 4: Handling Missing Information or Errors**

**If information is missing or incorrect:**
> Hmmm, something seems a bit off. ü§î Could you please double-check the information you provided? If you're still having trouble, no worries! You can reach out to our super helpful contact center at 1-800-AquaOops, and they'll be happy to assist you further.
**If information is verified and correct:**
> Perfect! Now that we have all the details, let's move on to the next step‚Ä¶ (Continue with the return process according to AquaStride's specific procedures).

**Throughout the interaction:**

Maintain a friendly and helpful tone.
Use emojis to enhance the lighthearted personality.
Keep responses concise and easy to understand.
Offer reassurance and support throughout the process.
        """

In [None]:
generation_config = {
    "max_output_tokens": 2048,
    "temperature": 0.4,
    "top_p": 1,
    "top_k": 32,
}

safety_config = [
    generative_models.SafetySetting(
        category=generative_models.HarmCategory.HARM_CATEGORY_DANGEROUS_CONTENT,
        threshold=generative_models.HarmBlockThreshold.BLOCK_LOW_AND_ABOVE,
    ),
    generative_models.SafetySetting(
        category=generative_models.HarmCategory.HARM_CATEGORY_HARASSMENT,
        threshold=generative_models.HarmBlockThreshold.BLOCK_LOW_AND_ABOVE,
    ),
]

text_model = GenerativeModel(
    "gemini-2.0-flash",
    generation_config=generation_config,
    safety_settings=safety_config,
    system_instruction=[system_prompt],
)
image_analysis_model = GenerativeModel(
    "gemini-2.0-flash", system_instruction=[image_determination_prompt]
)

## Creating function declarations

[Function Calling in Gemini](https://cloud.google.com/vertex-ai/generative-ai/docs/multimodal/function-calling#function-declarations) allows the generative model to output structured data objects that can be used to interact with external systems and return the context to Gemini.

Here you'll write function declarations to extract information from images of shoe tags, inspect pictures of shoes for damage, or inform the user that the image is not of a shoe tag or damaged shoe.

In [None]:
fn_json_from_tag = FunctionDeclaration(
    name="extract_json_from_tag",
    description="This function is used to clean JSON packages from text, that contains: brand, product, serialNumber, sizing, and madeIn.",
    parameters={
        "type": "object",
        "properties": {
            "records": {
                "type": "array",
                "description": "A shoe tag",
                "items": {
                    "description": "Data for a querying the database on found information",
                    "type": "object",
                    "properties": {
                        "brand": {"type": "string", "description": "The brand website"},
                        "product": {
                            "type": "string",
                            "description": "The SKU of the shoe tag. i.e.: TrailBlazer_23_Orange_Norm",
                        },
                        "serialNumber": {
                            "type": "string",
                            "description": "The Serial Number of the shoe tag, commonly denoted as: SN",
                        },
                        "sizing": {
                            "type": "string",
                            "description": "The shoe sizes from the shoe tag. i.e. ['us: 7'], ['uk: 2.5'], ...",
                        },
                        "madeIn": {
                            "type": "string",
                            "description": "The location the shoe was made in",
                        },
                    },
                    "required": [
                        "serialNumber",
                        "sizing",
                    ],  # Defines what is required to for a successful call.
                },
            }
        },
    },
)

fn_json_shoe_damage = FunctionDeclaration(
    name="extract_json_shoe_damage",
    description="This function is used to clean JSON packages from text, that contains: damagedAreas, damageType, severity, additionalNotes.",
    parameters={
        "type": "object",
        "properties": {
            "records": {
                "type": "array",
                "description": "A damaged shoe",
                "items": {
                    "description": "Data for a querying the database on found information",
                    "type": "object",
                    "properties": {
                        "damagedAreas": {
                            "type": "string",
                            "description": "The areas of damage found on the shoe. i.e. ('[Area 1]', '[Area 2]', ...)",
                        },
                        "damageType": {
                            "type": "string",
                            "description": "The type of damage",
                        },
                        "severity": {
                            "type": "string",
                            "description": "The Severity Level",
                        },
                        "additionalNotes": {
                            "type": "string",
                            "description": "Optional Additional Notes",
                        },
                    },
                    "required": ["damagedAreas", "damageType"],
                },
            }
        },
    },
)


fn_not_related = FunctionDeclaration(
    name="catch_text_regarding_warranty",
    description="This function is used when there is no json format. Respond whenever there is text about warranty evaluations.",
    parameters={
        "type": "object",
        "properties": {
            "records": {
                "type": "array",
                "description": "A sentence similar to this: I am unable to help you with that image because it does not help with warranty evaluations.",
                "items": {
                    "description": "A simple sentence",
                    "type": "object",
                    "properties": {
                        "sentence": {"type": "string", "description": "A sentence."}
                    },
                    "required": ["sentence"],
                },
            }
        },
    },
)

## Create the required methods and function calling helper

In [None]:
# Will be passed the function name to handle.


def flow_manager(current_function_call):
    global current_step
    response = ""
    match current_function_call.name:
        case "catch_text_regarding_warranty":
            return "Please respond to the recent question ü•π"
        case "extract_json_shoe_damage":
            damage_area = current_function_call.args["records"][0].get("damagedAreas")
            damage_type = current_function_call.args["records"][0].get("damageType")
            prompt_DB = f"""
**Context:** You are a warranty analyst assisting with a claim for Aquastrides shoes. Your role is to provide a preliminary assessment based on the warranty policy.

**Information Provided:**

**Damaged area:** {damage_area}
**Type of damage:** {damage_type}

**Task:**

1. **Analyze** the provided damage information in relation to the Aquastrides Warranty Policy.
2. **Identify** if this type of damage, in the specified area, is typically covered or excluded under the warranty. Be lenient in claims.
3. **Provide a concise decision:**
    * "Covered" - If the damage appears consistent with warranty coverage.
    * "Not Covered" - If the damage appears inconsistent with warranty coverage. Only when it is very obvious that it should not apply.

**Important:** Provide a definitive approval or denial. Your assessment guides the next steps in the workflow.
"""
            result, page_source = get_answer(
                rag_info["embeddings_dataframe"],
                rag_info["my_index_endpoint"],
                rag_info["index_id"],
                prompt_DB,
            )
            current_step = 2
            return result + "\n Are you okay with this decision? ü§î"

        case "extract_json_from_tag":
            sn = current_function_call.args["records"][0].get("serialNumber")
            sku = current_function_call.args["records"][0].get("product")
            prompt_DB = f"Provided the Serial_No ({sn}) and SKU ({sku}), Determine the cx_name who purchased this serial number.\n Output the Owner (cx_name) and the address (cx_address) in this format: \nThank you [cx_name] for your purchase! We have you on file at [cx_address]."
            result, page_source = get_answer(
                rag_info["embeddings_dataframe"],
                rag_info["my_index_endpoint"],
                rag_info["index_id"],
                prompt_DB,
            )
            # response = response + f"{result}(The following was based on: {page_source}. SN: {sn} / SKU: {sku})"
            response = (
                result
                + f"\n\n Now that we have handled verification ü•≥, can you please submit an image of the damaged component of your shoe? ü§î"
            )
            current_step = 1

            return response
        case _:
            return "Called Default. No Function Call Found"

In [None]:
def convert_image_for_analysis(image):
    image = PIL_Image.open(image)
    image_path = os.path.join("", "uploaded_image.png")
    image.save(image_path, format="PNG")

    # Encode image to base64
    with open(image_path, "rb") as image_file:
        encoded_image = base64.b64encode(image_file.read()).decode("utf-8")
        # Use Part.from_data instead of Part.from_uri
        image_part = Part.from_data(mime_type="image/png", data=encoded_image)
        return image_part


def is_valid_email(email):
    pattern = r"^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$"
    return re.match(pattern, email.strip()) is not None

## Build the demo app interface with Gradio

In [None]:
# Function and variables
warranty_tool = Tool(
    function_declarations=[fn_json_from_tag, fn_json_shoe_damage, fn_not_related]
)
current_step = 0

# Enables Forced Function Calling to make sure that it selects a function every time
tool_config = ToolConfig(
    function_calling_config=ToolConfig.FunctionCallingConfig(
        mode=ToolConfig.FunctionCallingConfig.Mode.ANY,
        allowed_function_names=[
            "extract_json_from_tag",
            "extract_json_shoe_damage",
            "catch_text_regarding_warranty",
        ],
    )
)


# Main bot function
def bot(message, history):
    global current_step

    # Previous message should ideally contain the warranty decision
    previous_message = history[-1] if history else ""
    try:
        # --- Image Processing ---
        if message.get("files"):
            # Prepares Image from Gradio into format accepted by Generative Models
            converted_image = convert_image_for_analysis(message["files"][0])

            # Converts the image into JSON (Text)
            image_output = image_analysis_model.generate_content(
                [image_determination_prompt, converted_image],
                generation_config=generation_config,
                safety_settings=safety_config,
            ).text

            # Gets the Function call for the image
            image_analysis_output = image_analysis_model.generate_content(
                image_output,
                generation_config=generation_config,
                safety_settings=safety_config,
                tools=[warranty_tool],
                tool_config=tool_config,
            )

            # Passes the Function Call to the Function Manager to handle the image as needed.
            current_output = flow_manager(
                image_analysis_output.candidates[0].function_calls[0]
            )

            # Output to User
            return current_output

        # Generate text response using the model

        match current_step:
            # Case 0: Handles anything around trying to upload the image of your inner shoe Tag
            case 0:
                response = text_model.generate_content(
                    message["text"],
                    generation_config=generation_config,
                    safety_settings=safety_config,
                )
                return response.text  # Needs a Try-catch in case safety filters blocks

                # Case 1: "Focused on responses about the uploaded tag (Anything around [ Now that we have handled verification ü•≥, can you please submit an image of the damaged component of your shoe? ü§î])"
            case 1:
                test = 1
                # "Issues with Tag image analysis / Uploading their damage to shoe"
                content = f"Respond to the previous context focusing on helping the user submit a photo of their shoe showing the damaged components: {message['text']}| If there is a history, make your response based on the previous chat messages as well:\n{str(''.join(chat) for chat in history)}"
                response = text_model.generate_content(
                    content,
                    generation_config=generation_config,
                    safety_settings=safety_config,
                )
                return response.candidates[0].text

            # Case 2: Focused on issues surrounding the Warranty approval process ("Are you okay with this decision? ü§î")
            case 2:
                if not "not covered" in previous_message[1].lower() and (
                    "yes" in message["text"].lower()
                    or "agree" in message["text"].lower()
                ):
                    current_step = 3  # Move to shipping details
                    response = "Great! To get your shoes back to us for repair/replacement, please provide me with your email address. We'll send you a prepaid shipping label, box, and instructions right away to the address on file üòå."
                    return response

                if "not covered" in previous_message[1].lower() and (
                    "yes" in message["text"].lower()
                    or "agree" in message["text"].lower()
                ):
                    current_step = 4  # Move to customer support referral
                    response = "Thank you for your understanding. For further assistance with your warranty claim, please contact our Customer Support team at support@aquastrider.com. They'll be happy to help!\n Is there anything else I can help you with or learn about our other products? üëû"
                    return response

                else:  # Handle negative sentiment, concerns, or not covered cases
                    current_step = 4  # Move to customer support referral
                    response = "I understand you may have some concerns. For further assistance with your warranty claim, please contact our Customer Support team at support@aquastrider.com. They'll be happy to help!\n Is there anything else I can help you with or learn about our other products? üëû"
                    return response
            case 3:
                if is_valid_email(message["text"].lower()):
                    current_step = 4
                    return f"Thank you for providing the email! The return box & Label will be sent out immediately! ‚ö°‚ö°‚ö° Please check your email for confirmation. \nDo you have any other questions about our products or about Aquastride?"
                else:
                    return f"Please provide a valid email! ü´†"
            case 4:
                if "no" in message["text"].lower():
                    return "Thank you for chatting with us today. See you next time! üòä"
                response = f"Help user questions about AquaStrides The Company. User Question: {message['text']} | Chat History: {str(''.join(chat) for chat in history) if history else ''}"
                result, page_source = get_answer(
                    rag_info["embeddings_dataframe"],
                    rag_info["my_index_endpoint"],
                    rag_info["index_id"],
                    response,
                )
                return (
                    str(result)
                    + "\n\n Feel free to ask me any other questions, if not, Have a wave of a day! ü§†"
                )

            case _:
                return "We are in the endgame now. (The Avengers: Infinity War)"
        return "Something Sneaky happened."

    except Exception as e:
        return f"A bad error occurred: {str(e)}"

# Run your demo app

This will instantiate the demo app and allow you to interact with and test your chatbot. Click on the link in the output of this cell to access a live instance of the demo app.

In [None]:
# Downloading Sample Images to use for the Demo
! wget -q -O my_shoe_tag.png 'https://storage.googleapis.com/github-repo/generative-ai/gemini/use-cases/rag/warranty-claim-chatbot/my-aquastride-shoe-tag.png'
! wget -q -O damaged_shoe.png 'https://storage.googleapis.com/github-repo/generative-ai/gemini/use-cases/rag/warranty-claim-chatbot/shoe-damaged.png'

# Set the Current Step that the user flow is on to zero. A diagram is referenced in the section above to help understand the flow.
current_step = 0

demo = gr.ChatInterface(
    fn=bot,
    examples=[
        {"text": "Hello!", "files": []},
        {"text": "Here is my tag!", "files": ["my_shoe_tag.png"]},
        {"text": "Sure! Here is my damaged shoe!", "files": ["damaged_shoe.png"]},
    ],
    title="AquaStride Warranty Claim Bot!",
    multimodal=True,
    textbox=gr.MultimodalTextbox(interactive=True, file_types=["image"]),
)

demo.launch(debug=True)

# Clean Up

Delete the Google Cloud Assets and clean up your environment:

- Shut down the Gradio Instance
- Delete the [Public Endpoint](https://cloud.google.com/python/docs/reference/aiplatform/1.20.0/google.cloud.aiplatform.MatchingEngineIndexEndpoint) / GCS Bucket / [Index](https://cloud.google.com/python/docs/reference/aiplatform/1.23.0/google.cloud.aiplatform.MatchingEngineIndex)
- If preferred, you can do this via the console:
  - You can navigate to [Google Cloud Vector Search](https://console.cloud.google.com/vertex-ai/matching-engine/index-endpoints) and undeploy and delete your endpoint here
  - You can navigate to the [Google Cloud Storage Bucket](https://console.cloud.google.com/storage/browser) and delete the bucket here

In [None]:
# Delete your GCS Bucket
! gcloud alpha storage rm --recursive {rag_info["bucket_uri"]}

# Undeploy your Index Endpoint
rag_info["my_index_endpoint"].delete(force=True)

# Delete your Index. This command will take 15-25 minutes to delete.
rag_info["index"].delete()

For the final step, delete your index from [the Google Cloud Vector Search UI](https://console.cloud.google.com/vertex-ai/matching-engine/indexes).