In [None]:
# Copyright 2024 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

# Stage 2: Building MVP: - 04 Evaluation


<table align="left">
  <td style="text-align: center">
    <a href="https://colab.research.google.com/github/GoogleCloudPlatform/generative-ai/blob/main/workshops/rag-ops/2.4_mvp_evaluation.ipynb">
      <img src="https://cloud.google.com/ml-engine/images/colab-logo-32px.png" alt="Google Colaboratory logo"><br> Open in Colab
    </a>
  </td>
  <td style="text-align: center">
    <a href="https://console.cloud.google.com/vertex-ai/colab/import/https:%2F%2Fraw.githubusercontent.com%2FGoogleCloudPlatform%2Fgenerative-ai%2Fmain%2Fworkshops%2Frag-ops%2F2.4_mvp_evaluation.ipynb">
      <img width="32px" src="https://lh3.googleusercontent.com/JmcxdQi-qOpctIvWKgPtrzZdJJK-J3sWE1RsfjZNwshCFgE_9fULcNpuXYTilIR2hjwN" alt="Google Cloud Colab Enterprise logo"><br> Open in Colab Enterprise
    </a>
  </td>    
  <td style="text-align: center">
    <a href="https://console.cloud.google.com/vertex-ai/workbench/deploy-notebook?download_url=https://raw.githubusercontent.com/GoogleCloudPlatform/generative-ai/main/workshops/rag-ops/2.4_mvp_evaluation.ipynb">
      <img src="https://lh3.googleusercontent.com/UiNooY4LUgW_oTvpsNhPpQzsstV5W8F7rYgxgGBD85cWJoLmrOzhVs_ksK_vgx40SHs7jCqkTkCk=e14-rj-sc0xffffff-h130-w32" alt="Vertex AI logo"><br> Open in Workbench
    </a>
  </td>
  <td style="text-align: center">
    <a href="https://github.com/GoogleCloudPlatform/generative-ai/blob/main/workshops/rag-ops/2.4_mvp_evaluation.ipynb">
      <img src="https://cloud.google.com/ml-engine/images/github-logo-32px.png" alt="GitHub logo"><br> View on GitHub
    </a>
  </td>
</table>

<div style="clear: both;"></div>

<b>Share to:</b>

<a href="https://www.linkedin.com/sharing/share-offsite/?url=https%3A//github.com/GoogleCloudPlatform/generative-ai/blob/main/workshops/rag-ops/2.4_mvp_evaluation.ipynb" target="_blank">
  <img width="20px" src="https://upload.wikimedia.org/wikipedia/commons/8/81/LinkedIn_icon.svg" alt="LinkedIn logo">
</a>

<a href="https://bsky.app/intent/compose?text=https%3A//github.com/GoogleCloudPlatform/generative-ai/blob/main/workshops/rag-ops/2.4_mvp_evaluation.ipynb" target="_blank">
  <img width="20px" src="https://upload.wikimedia.org/wikipedia/commons/7/7a/Bluesky_Logo.svg" alt="Bluesky logo">
</a>

<a href="https://twitter.com/intent/tweet?url=https%3A//github.com/GoogleCloudPlatform/generative-ai/blob/main/workshops/rag-ops/2.4_mvp_evaluation.ipynb" target="_blank">
  <img width="20px" src="https://upload.wikimedia.org/wikipedia/commons/5/5a/X_icon_2.svg" alt="X logo">
</a>

<a href="https://reddit.com/submit?url=https%3A//github.com/GoogleCloudPlatform/generative-ai/blob/main/workshops/rag-ops/2.4_mvp_evaluation.ipynb" target="_blank">
  <img width="20px" src="https://redditinc.com/hubfs/Reddit%20Inc/Brand/Reddit_Logo.png" alt="Reddit logo">
</a>

<a href="https://www.facebook.com/sharer/sharer.php?u=https%3A//github.com/GoogleCloudPlatform/generative-ai/blob/main/workshops/rag-ops/2.4_mvp_evaluation.ipynb" target="_blank">
  <img width="20px" src="https://upload.wikimedia.org/wikipedia/commons/5/51/Facebook_f_logo_%282019%29.svg" alt="Facebook logo">
</a>            

## Overview

This notebook is the fourth in a series designed to guide you through building a Minimum Viable Product (MVP) for a Multimodal Retrieval Augmented Generation (RAG) system using the Vertex Gemini API.

Having built a functional RAG system with retrieval and generation components in the previous notebook, we now turn our attention to evaluating its performance. This notebook focuses on assessing the quality of the generated answers using two key metrics, providing a comprehensive understanding of your system's strengths and weaknesses.

**Here's what you'll achieve:**

* **Implement Evaluation Metrics:**  Gain a deep understanding of two crucial evaluation metrics for RAG systems: Answer Correctness and Context Recall.  You'll implement these metrics from scratch, giving you full transparency into the underlying calculations and prompt engineering involved.
* **Analyze Model Performance:**  Apply the implemented metrics to evaluate the answers generated by both Gemini 2.0 and Gemini 2.0 models. This analysis will provide valuable insights into the accuracy and relevance of the generated responses.
* **Compare and Contrast:**  Visualize the performance of both models across all samples of the ground truth data using comparative plots. This visualization will help you identify trends, strengths, and areas for potential improvement in each model's performance.
* **Enhance Transparency and Understanding:**  By coding the evaluation metrics from scratch, you gain a deeper understanding of how they work and can customize them to your specific needs. This transparency allows for more informed decision-making when refining your RAG system.

This notebook provides a crucial step in the iterative development of your RAG system. By rigorously evaluating your system's performance, you can identify areas for improvement and optimize its ability to provide accurate and relevant information. This hands-on approach to evaluation empowers you to build a more robust and reliable RAG MVP.


## Getting Started

### Install Vertex AI SDK for Python


In [1]:
%pip install --upgrade --user --quiet google-cloud-aiplatform

### Restart runtime

To use the newly installed packages in this Jupyter runtime, you must restart the runtime. You can do this by running the cell below, which restarts the current kernel.

In [2]:
import sys

if "google.colab" in sys.modules:
    import IPython

    app = IPython.Application.instance()
    app.kernel.do_shutdown(True)

<div class="alert alert-block alert-warning">
<b>⚠️ The kernel is going to restart. Please wait until it is finished before continuing to the next step. ⚠️</b>
</div>


### Authenticate your notebook environment (Colab only)

If you are running this notebook on Google Colab, run the cell below to authenticate your environment.


In [1]:
import sys

if "google.colab" in sys.modules:
    from google.colab import auth

    auth.authenticate_user()

### Set Google Cloud project information, GCS Bucket and initialize Vertex AI SDK

To get started using Vertex AI, you must have an existing Google Cloud project and [enable the Vertex AI API](https://console.cloud.google.com/flows/enableapi?apiid=aiplatform.googleapis.com).

Learn more about [setting up a project and a development environment](https://cloud.google.com/vertex-ai/docs/start/cloud-environment).

In [2]:
import os
import sys

from google.cloud import storage
import vertexai

PROJECT_ID = "[your-project-id]"  # @param {type:"string"}
LOCATION = "us-central1"
BUCKET_NAME = "mlops-for-genai"

if PROJECT_ID == "[your-project-id]":
    PROJECT_ID = str(os.environ.get("GOOGLE_CLOUD_PROJECT"))

if not PROJECT_ID or PROJECT_ID == "[your-project-id]" or PROJECT_ID == "None":
    raise ValueError("Please set your PROJECT_ID")


vertexai.init(project=PROJECT_ID, location=LOCATION)

# Initialize cloud storage
storage_client = storage.Client(project=PROJECT_ID)
bucket = storage_client.bucket(BUCKET_NAME)

In [3]:
# # Variables for data location. Do not change.

PRODUCTION_DATA = "multimodal-finanace-qa/data/unstructured/production/"
PICKLE_FILE_NAME = "training_data_results.pkl"

### Import libraries


In [4]:
# Library
import nltk

nltk.download("punkt")
import pickle

from google.cloud import storage
import numpy as np
import pandas as pd
from rich import print as rich_print
from vertexai.generative_models import GenerationConfig, GenerativeModel
from vertexai.language_models import TextEmbeddingInput, TextEmbeddingModel

### Load the Gemini 2.0 models

To learn more about all [Gemini API models on Vertex AI](https://cloud.google.com/vertex-ai/generative-ai/docs/learn/models#gemini-models).

The Gemini model family has several model versions. You will start by using Gemini 2.0. Gemini 2.0 is a more lightweight, fast, and cost-efficient model. This makes it a great option for prototyping.


In [5]:
MODEL_ID_FLASH = "gemini-2.0-flash"  # @param {type:"string"}
MODEL_ID_PRO = "gemini-2.0-flash"  # @param {type:"string"}


gemini_15_flash = GenerativeModel(MODEL_ID_FLASH)
gemini_15_pro = GenerativeModel(MODEL_ID_PRO)

In [6]:
# @title Helper Functions


def get_load_dataframes_from_gcs():
    gcs_path = "multimodal-finanace-qa/data/embeddings/index_db.pkl"
    # print("GCS PAth: ", gcs_path)
    blob = bucket.blob(gcs_path)

    # Download the pickle file from GCS
    blob.download_to_filename(f"{PICKLE_FILE_NAME}")

    # Load the pickle file into a list of dataframes
    with open(f"{PICKLE_FILE_NAME}", "rb") as f:
        dataframes = pickle.load(f)

    # Assign the dataframes to variables
    (
        index_db_final,
        extracted_text_chunk_df,
        video_metadata_chunk_df,
        audio_metadata_chunk_df,
    ) = dataframes

    return (
        index_db_final,
        extracted_text_chunk_df,
        video_metadata_chunk_df,
        audio_metadata_chunk_df,
    )


def get_load_training_dataframes_from_gcs():
    gcs_path = "multimodal-finanace-qa/data/structured/" + PICKLE_FILE_NAME
    # print("GCS PAth: ", gcs_path)
    blob = bucket.blob(gcs_path)

    # Download the pickle file from GCS
    blob.download_to_filename(f"{PICKLE_FILE_NAME}")

    # Load the pickle file into a list of dataframes
    with open(f"{PICKLE_FILE_NAME}", "rb") as f:
        dataframes = pickle.load(f)

    # Assign the dataframes to variables
    training_data_flash, training_data_pro = dataframes

    return training_data_flash, training_data_pro

![](https://storage.googleapis.com/mlops-for-genai/multimodal-finanace-qa/img/rag_eval_flow.png)

In [7]:
# Get the data that has been extracted in the previous step: IndexDB.
# Make sure that you have ran the previous notebook: stage_2_mvp_chunk_embeddings.ipynb


(
    index_db_final,
    extracted_text_chunk_df,
    video_metadata_chunk_df,
    audio_metadata_chunk_df,
) = get_load_dataframes_from_gcs()
training_data_flash, training_data_pro = get_load_training_dataframes_from_gcs()

In [8]:
index_db_final.head()

In [9]:
training_data_flash.head(2)

In [10]:
training_data_pro.head(2)

In [11]:
training_data_pro.shape

### Evaluations

In [12]:
# @title Answer Correctness

import numpy as np


def embed_text(
    texts: list[str] = ["banana muffins? ", "banana bread? banana muffins?"],
    task: str = "RETRIEVAL_DOCUMENT",
    model_name: str = "text-embedding-005",
) -> list[list[float]]:
    """Embeds texts with a pre-trained, foundational model."""
    model = TextEmbeddingModel.from_pretrained(model_name)
    inputs = [TextEmbeddingInput(text, task) for text in texts]
    embeddings = model.get_embeddings(inputs)
    return [embedding.values for embedding in embeddings][0]


def calculate_final_score(semantic_similarity, score, weights):
    """
    Calculates the final score as a weighted average of semantic similarity and factual similarity.

    Args:
      semantic_similarity: Float value representing semantic similarity.
      score: Float value representing factual similarity (or another score).
      weights: A list or tuple of two float values representing the weights for semantic similarity and factual similarity, respectively. The weights should sum to 1.

    Returns:
      The final score as a float value.
    """

    # Ensure weights are valid
    assert len(weights) == 2, "Weights must have two values"
    assert sum(weights) == 1, "Weights must sum to 1"

    final_score = (weights[0] * semantic_similarity) + (weights[1] * score)
    return round(final_score, 2)


def get_answer_correctness(data_samples, debug=False, num_runs=3, retry=2):

    question = data_samples["question"]
    answer = data_samples["answer"]
    ground_truth = data_samples["ground_truth"]

    response_schema = {
        "type": "object",
        "properties": {
            "TP": {"type": "array", "items": {"type": "string"}},
            "FP": {"type": "array", "items": {"type": "string"}},
            "FN": {"type": "array", "items": {"type": "string"}},
        },
    }

    prompt = f"""Task: Analyze the Question, Answer, and Ground Truth to identify and categorize information.
Provide your response in JSON format with the following structure:
{{
  "TP": ["list of true positive statements"],
  "FP": ["list of false positive statements"],
  "FN": ["list of false negative statements"]
}}

Here's how to categorize the information:

* **TP (True Positive):**  Statements that are factually correct and present in BOTH the Answer and Ground Truth.
   *Example:* If the Ground Truth says "The capital of France is Paris" and the Answer says "Paris is the capital of France", then "The capital of France is Paris" is a TP.
* **FP (False Positive):** Statements present in the Answer BUT NOT factually correct according to the Ground Truth.
   *Example:* If the Answer says "The Earth is flat", but the Ground Truth does not support this, then "The Earth is flat" is an FP.
* **FN (False Negative):**  Factually correct statements present in the Ground Truth BUT missing from the Answer.
   *Example:* If the Ground Truth says "The sky is blue" but the Answer doesn't mention this fact, then "The sky is blue" is an FN.

Question: {question}

Answer:  {answer}

Ground Truth:  {ground_truth}

Analyze the answer and extract the TP, FP, and FN based on factual correctness.
"""

    cg_model = GenerativeModel(
        model_name="gemini-2.0-flash",
        generation_config=GenerationConfig(
            response_mime_type="application/json", response_schema=response_schema
        ),
    )

    scores = []
    for _ in range(num_runs):
        for _ in range(retry):
            response = cg_model.generate_content(prompt)
            try:
                tp = len(eval(response.text)["TP"])
                fp = len(eval(response.text)["FP"])
                fn = len(eval(response.text)["FN"])
                break  # Break out of the retry loop if successful
            except KeyError:
                print("Retrying...")  # Indicate a retry is happening

        f1_score = tp / (tp + 0.5 * (fp + fn)) if tp > 0 else 0

        answer_embedding = embed_text(answer)
        ground_truth_embedding = embed_text(ground_truth)
        semantic_similarity = round(
            np.dot(answer_embedding[0], ground_truth_embedding[0]), 2
        )

        weights = (
            0.1,
            0.9,
        )  # weightage to semantic similarity, weightage for factual similarity

        final_score = calculate_final_score(semantic_similarity, f1_score, weights)
        scores.append(final_score)

        if debug:
            rich_print(response.text)
            print("F1 Score: ", f1_score)
            print("Semantic Similarity: ", semantic_similarity)
            print("Final Score: Answer Correctness :", final_score)

    return np.max(scores)  # Return the average score


def get_answer_correctness_row_wise(row, num_runs=3, retry=2):
    data_samples = {
        "question": [row["question"]],
        "answer": [row["gen_answer"]],
        "ground_truth": [row["answer"]],
    }

    score = get_answer_correctness(data_samples, num_runs=num_runs, retry=retry)
    return score

![](https://storage.googleapis.com/mlops-for-genai/multimodal-finanace-qa/img/answer_correctness.png)

In [13]:
# @title Context Recall


import nltk


def get_context_precision_prompt(question, answer_chunk_list, context):
    return f"""Task: Given a context, an answer (broken down into statements), and a question, carefully analyze EACH statement in the answer and classify if it can be logically inferred or directly attributed to the given context.

    Provide your response in JSON format with the following structure for each statement in the answer:
    [
      {{
        "statement": "<the original statement from the answer>",
        "attributed": <1 if attributed to the context, 0 otherwise>,
        "reason": "<a concise explanation for the attribution>"
      }},
      {{
        "statement": "<the original statement from the answer>",
        "attributed": <1 if attributed to the context, 0 otherwise>,
        "reason": "<a concise explanation for the attribution>"
      }},
      ...
    ]

    Instructions:
    - Focus on the meaning and implications of both the context and EACH statement in the answer.
    - Consider if the statement is a reasonable deduction or a paraphrase of information present in the context.
    - If the statement introduces new information not mentioned or implied in the context, it should be classified as not attributed.
    - Ensure that you provide a response for EVERY statement in the answer, without any repetitions or omissions


    Question: {question}
    Answer Statements: {answer_chunk_list}
    Context: {context}
    """


def calculate_context_precision(question, answer, context, num_runs=3, retry=2):
    """
    Calculates context precision based on the provided question, answer, and context.

    Args:
        question: The question asked.
        answer: The answer given.
        context: The context provided.
        num_runs: The number of times to run the evaluation.
        retry: The number of times to retry if the format is incorrect.

    Returns:
        The calculated context precision value (between 0 and 1).
    """
    response_schema = {
        "type": "array",
        "items": {
            "type": "object",
            "properties": {
                "statement": {"type": "string"},
                "reason": {"type": "string"},
                "attributed": {"type": "integer"},
            },
        },
    }

    cg_model = GenerativeModel(
        model_name="gemini-2.0-flash",
        generation_config=GenerationConfig(
            response_mime_type="application/json", response_schema=response_schema
        ),
    )

    answer_chunk_list = nltk.sent_tokenize(answer)

    precision_scores = []
    for _ in range(num_runs):
        for _ in range(retry):
            try:
                response = cg_model.generate_content(
                    get_context_precision_prompt(question, answer_chunk_list, context)
                ).text
                evaluation_results = eval(response)

                # Check if the response has the expected format
                if isinstance(evaluation_results, list) and all(
                    isinstance(result, dict) and "attributed" in result
                    for result in evaluation_results
                ):
                    break  # Break out of the retry loop if successful
                else:
                    print("Retrying...")
            except (SyntaxError, ValueError):  # Catch errors in eval
                print("Retrying...")

        # Count the total number of statements and the number of statements attributed to the context
        attributed_statements = sum(
            1 for result in evaluation_results if result.get("attributed") == 1
        )
        total_statements = len(evaluation_results)

        # Calculate context precision
        context_precision = (
            attributed_statements / total_statements if total_statements > 0 else 0
        )
        precision_scores.append(context_precision)

    return np.max(precision_scores)  # Return the average


def get_context_precision_row_wise(row, num_runs=3, retry=2):
    question = row["question"]
    answer = row["gen_answer"]
    context = "\n".join([each_cit["content"] for each_cit in row["citation"][:20]])

    context_precision = calculate_context_precision(
        question, answer, context, num_runs=num_runs, retry=retry
    )
    return context_precision

![](https://storage.googleapis.com/mlops-for-genai/multimodal-finanace-qa/img/context_recall.png)

In [14]:
%%time

# Gemini 2.0
training_data_pro_subset = training_data_pro[
    ["question", "answer", "gen_answer", "citation"]
]

# Specify the desired number of runs and retries # re-run the cell if you get ServiceUnavailable: 503 502:Bad Gateway
num_runs = 3
retry_attempts = 3

training_data_pro_subset["answer_correctness"] = training_data_pro_subset.apply(
    lambda row: get_answer_correctness_row_wise(
        row, num_runs=num_runs, retry=retry_attempts
    ),
    axis=1,
)

In [15]:
%%time

# Gemini 2.0
training_data_flash_subset = training_data_flash[
    ["question", "answer", "gen_answer", "citation"]
]

# Specify the desired number of runs and retries # re-run the cell if you get ServiceUnavailable: 503 502:Bad Gateway
num_runs = 3
retry_attempts = 3

training_data_flash_subset["answer_correctness"] = training_data_flash_subset.apply(
    lambda row: get_answer_correctness_row_wise(
        row, num_runs=num_runs, retry=retry_attempts
    ),
    axis=1,
)

In [16]:
%%time

# Specify the desired number of runs and retries # re-run the cell if you get ServiceUnavailable: 503 502:Bad Gateway

num_runs = 3
retry_attempts = 3

training_data_pro_subset["context_recall"] = training_data_pro_subset.apply(
    lambda row: get_context_precision_row_wise(
        row, num_runs=num_runs, retry=retry_attempts
    ),
    axis=1,
)

In [18]:
%%time

# Specify the desired number of runs and retries # re-run the cell if you get ServiceUnavailable: 503 502:Bad Gateway

num_runs = 3
retry_attempts = 3

training_data_flash_subset["context_recall"] = training_data_flash_subset.apply(
    lambda row: get_context_precision_row_wise(
        row, num_runs=num_runs, retry=retry_attempts
    ),
    axis=1,
)

In [19]:
training_data_pro_subset.head()

In [20]:
training_data_flash_subset.head()

### Comparing the results

In [21]:
import matplotlib.pyplot as plt
import pandas as pd

# Assuming training_data_pro_subset and training_data_flash_subset are your DataFrames

# Combine the data
combined_data = pd.DataFrame(
    {
        "Gemini 2.0": training_data_pro_subset["answer_correctness"],
        "Gemini 2.0": training_data_flash_subset["answer_correctness"],
    }
)

# Plot the combined data
combined_data.plot(kind="bar", title="Answer Correctness Comparison")
plt.xlabel("Index")  # Or any other relevant label for your x-axis
plt.ylabel("Answer Correctness")
plt.show()

In [22]:
import matplotlib.pyplot as plt
import pandas as pd

# Assuming training_data_pro_subset and training_data_flash_subset are your DataFrames

# Combine the data
combined_data = pd.DataFrame(
    {
        "Gemini 2.0": training_data_pro_subset["context_recall"],
        "Gemini 2.0": training_data_flash_subset["context_recall"],
    }
)

# Plot the combined data
combined_data.plot(kind="bar", title="Context Recall Comparison")
plt.xlabel("Index")  # Or any other relevant label for your x-axis
plt.ylabel("Context Recall")
plt.show()

In [23]:
index = 5

print("*******The question: *******\n")
rich_print(training_data_pro["question"][index])

print("\n*******The ground-truth answer:*******\n")
rich_print(training_data_pro["answer"][index])

print("\n*******The generated answer - Gemini 2.0: *******\n")
rich_print(training_data_pro["gen_answer"][index])

print("\n*******The answer_correctness - Gemini 2.0: *******\n")
rich_print(training_data_pro_subset["answer_correctness"][index])

print("\n*******The context_recall - Gemini 2.0: *******\n")
rich_print(training_data_pro_subset["context_recall"][index])

print("\n*******The generated answer  - Gemini 2.0: *******\n")
rich_print(training_data_flash["gen_answer"][index])

print("\n*******The answer_correctness - Gemini 2.0: *******\n")
rich_print(training_data_flash_subset["answer_correctness"][index])

print("\n*******The context_recall - Gemini 2.0: *******\n")
rich_print(training_data_flash_subset["context_recall"][index])

### Save the intermediate Files

In [None]:
# # [Optional]

# import pickle

# pickle_file_name ="training_data_eval_results.pkl"
# data_to_dump = [training_data_pro_subset, training_data_flash_subset]

# gcs_location = f"gs://mlops-for-genai/multimodal-finanace-qa/data/structured/{pickle_file_name}"

# with open(f"{pickle_file_name}", "wb") as f:
#     pickle.dump(data_to_dump, f)


# # Upload the pickle file to GCS
# !gsutil cp {pickle_file_name} {gcs_location}