In [None]:
# Copyright 2024 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

# Evaluate generated answers from Retrieval-Augmented Generation (RAG) using Rapid Evaluation and Dataflow ML with Vertex AI pipelines

<table align="left">
  <td style="text-align: center">
    <a href="https://colab.research.google.com/github/GoogleCloudPlatform/generative-ai/blob/main/gemini/evaluation/evaluate_rag_batch_pipeline.ipynb">
      <img src="https://cloud.google.com/ml-engine/images/colab-logo-32px.png" alt="Google Colaboratory logo"><br> Open in Colab
    </a>
  </td>
  <td style="text-align: center">
    <a href="https://console.cloud.google.com/vertex-ai/colab/import/https:%2F%2Fraw.githubusercontent.com%2FGoogleCloudPlatform%2Fgenerative-ai%2Fmain%2Fgemini%2Fevaluation%2Fevaluate_rag_batch_pipeline.ipynb">
      <img width="32px" src="https://lh3.googleusercontent.com/JmcxdQi-qOpctIvWKgPtrzZdJJK-J3sWE1RsfjZNwshCFgE_9fULcNpuXYTilIR2hjwN" alt="Google Cloud Colab Enterprise logo"><br> Open in Colab Enterprise
    </a>
  </td>    
  <td style="text-align: center">
    <a href="https://console.cloud.google.com/vertex-ai/workbench/deploy-notebook?download_url=https://raw.githubusercontent.com/GoogleCloudPlatform/generative-ai/main/gemini/evaluation/evaluate_rag_batch_pipeline.ipynb">
      <img src="https://lh3.googleusercontent.com/UiNooY4LUgW_oTvpsNhPpQzsstV5W8F7rYgxgGBD85cWJoLmrOzhVs_ksK_vgx40SHs7jCqkTkCk=e14-rj-sc0xffffff-h130-w32" alt="Vertex AI logo"><br> Open in Workbench
    </a>
  </td>
  <td style="text-align: center">
    <a href="https://github.com/GoogleCloudPlatform/generative-ai/blob/main/gemini/evaluation/evaluate_rag_batch_pipeline.ipynb">
      <img src="https://cloud.google.com/ml-engine/images/github-logo-32px.png" alt="GitHub logo"><br> View on GitHub
    </a>
  </td>
</table>

| | |
|-|-|
|Author(s) | [Ivan Nardini](https://github.com/inardini) |

## Overview

This notebook shows how you can use Vertex AI Pipelines to build a Gen AI Model Evaluation Batch pipeline to evaluate a question-answering task with Rapid Eval API and DataflowML.

This tutorial uses the following Google Cloud ML services:

- `Dataflow`
- `Vertex AI Rapid Eval API`
- `Vertex AI Pipelines`

The steps performed include:

- Prepare the evaluation dataset.
- Build validation, evaluation and visualization pipeline components.
- Define your pipeline using Kubeflow Pipelines DSL package.
- Compile your pipeline.
- Submit your pipeline run.

Learn more about [Vertex AI Rapid Eval API](https://cloud.google.com/vertex-ai/generative-ai/docs/model-reference/evaluation), [Dataflow ML](https://cloud.google.com/dataflow/docs/machine-learning) and [Vertex AI Pipelines](https://cloud.google.com/vertex-ai/docs/pipelines/introduction).

## Get started

### Install required packages


In [None]:
%pip3 install --upgrade --quiet pip google-cloud-aiplatform google-cloud-pipeline-components
%pip3 install --upgrade --quiet pandas plotly multiprocess etils

### Restart runtime (Colab only)

To use the newly installed packages, you must restart the runtime on Google Colab.

In [None]:
if "google.colab" in sys.modules:

    import IPython

    app = IPython.Application.instance()
    app.kernel.do_shutdown(True)

<div class="alert alert-block alert-warning">
<b>⚠️ The kernel is going to restart. Wait until it's finished before continuing to the next step. ⚠️</b>
</div>


### Authenticate your notebook environment (Colab only)

Authenticate your environment on Google Colab.


In [None]:
if "google.colab" in sys.modules:

    from google.colab import auth

    auth.authenticate_user()

### Set Google Cloud project information

To get started using Vertex AI, you must have an existing Google Cloud project and [enable the Vertex AI API](https://console.cloud.google.com/flows/enableapi?apiid=aiplatform.googleapis.com).

Learn more about [setting up a project and a development environment](https://cloud.google.com/vertex-ai/docs/start/cloud-environment).

In [None]:
PROJECT_ID = "[your-project-id]"  # @param {type:"string"}

# Set the project id
! gcloud config set project {PROJECT_ID}

PROJECT_NUMBER = !gcloud projects describe $PROJECT_ID --format="value(projectNumber)"
PROJECT_NUMBER = PROJECT_NUMBER[0]

REGION = "us-central1"  # @param {type: "string"}

#### Create a Cloud Storage bucket

Create a storage bucket to store intermediate artifacts such as datasets.

In [None]:
BUCKET_NAME = "your-bucket-name-{PROJECT_ID}-unique"  # @param {type:"string"}

BUCKET_URI = f"gs://{BUCKET_NAME}"  # @param {type:"string"}

In [None]:
! gsutil mb -l {REGION} -p {PROJECT_ID} {BUCKET_URI}

#### Service Account and permissions

This notebook requires a service account with the following permissions:

-   `Vertex AI User` to call Vertex API
-   `Storage Object Admin` to read and write to your GCS bucket.
-   `Dataflow Worker` to execute work units for a Dataflow pipeline with Compute Engine service account.
-   `Dataflow Developer` to execute and manipulate Dataflow jobs.

[Check out the documentation](https://cloud.google.com/iam/docs/manage-access-service-accounts#iam-view-access-sa-gcloud) to know how to grant those permissions to a single service account.


In [None]:
SERVICE_ACCOUNT = "[your-service-account]"  # @param {type:"string"}

SERVICE_ACCOUNT = f"{PROJECT_NUMBER}-compute@developer.gserviceaccount.com"

In [None]:
! gcloud projects add-iam-policy-binding {PROJECT_ID} \
    --member=serviceAccount:{SERVICE_ACCOUNT} \
    --role=roles/aiplatform.user

! gcloud projects add-iam-policy-binding {PROJECT_ID} \
    --member=serviceAccount:{SERVICE_ACCOUNT} \
    --role=roles/storage.objectAdmin

! gcloud projects add-iam-policy-binding {PROJECT_ID} \
    --member=serviceAccount:{SERVICE_ACCOUNT} \
    --role=roles/dataflow.worker

! gcloud projects add-iam-policy-binding {PROJECT_ID} \
    --member=serviceAccount:{SERVICE_ACCOUNT} \
    --role=roles/dataflow.developer

### Initialize Vertex AI SDK for Python

Initialize the Vertex AI SDK for Python for your project and corresponding bucket.

In [None]:
from google.cloud import aiplatform

aiplatform.init(project=PROJECT_ID, location=REGION, staging_bucket=BUCKET_URI)

### Set tutorial folder and workspace

Set a folder to collect data and any tutorial artifacts.

In [None]:
from pathlib import Path

ROOT_PATH = Path.cwd()
TUTORIAL_PATH = ROOT_PATH / "tutorial"
DATA_PATH = TUTORIAL_PATH / "data"
SRC_PATH = TUTORIAL_PATH / "src"
PIPELINE_PATH = TUTORIAL_PATH / "pipeline"
EVAL_PATH = TUTORIAL_PATH / "evaluations"

DATA_PATH.mkdir(parents=True, exist_ok=True)
SRC_PATH.mkdir(parents=True, exist_ok=True)
PIPELINE_PATH.mkdir(parents=True, exist_ok=True)
EVAL_PATH.mkdir(parents=True, exist_ok=True)

from etils import epath

WORKSPACE_BUCKET_URI = epath.Path(BUCKET_URI) / "evaluate_rag"
DATA_URI = WORKSPACE_BUCKET_URI / "data"
EVALUATIONS_URI = WORKSPACE_BUCKET_URI / "evaluations"
SRC_URI = WORKSPACE_BUCKET_URI / "src"
PIPELINE_ROOT_URI = WORKSPACE_BUCKET_URI / "pipeline"
TMP_URI = WORKSPACE_BUCKET_URI / "tmp"

WORKSPACE_BUCKET_URI.mkdir(parents=True, exist_ok=True)
DATA_URI.mkdir(parents=True, exist_ok=True)
EVALUATIONS_URI.mkdir(parents=True, exist_ok=True)
SRC_URI.mkdir(parents=True, exist_ok=True)
PIPELINE_ROOT_URI.mkdir(parents=True, exist_ok=True)
TMP_URI.mkdir(parents=True, exist_ok=True)

### Import libraries

Import the required libraries.

In [None]:
# General
from pathlib import Path
from typing import NamedTuple

from IPython.display import HTML, display

# Model Eval (locally)
from google import auth

# Model Eval (remote)
from google_cloud_pipeline_components.types.artifact_types import VertexDataset
from google_cloud_pipeline_components.v1.dataflow import DataflowPythonJobOp
from google_cloud_pipeline_components.v1.wait_gcp_resources import WaitGcpResourcesOp
from kfp import compiler, dsl
from kfp.dsl import Metrics, Output
import pandas as pd
import plotly.graph_objects as go

### Set constants

Set tutorial variables.

In [None]:
INPUT_EVALUATION_DATASET_URI = "gs://github-repo/evaluate-gemini-autosxs-custom-task/evaluation_rag_qa_dataset.jsonl"

OUTPUT_EVALUATION_DATASET_URI = str(EVALUATIONS_URI / "rag_qa_eval")

### Helpers

Define a helper function to print evaluation results.

In [None]:
def print_content(df: pd.DataFrame, columns: list[str], n: int = 2) -> None:
    """Prints specified text columns from a DataFrame."""

    style = "white-space: pre-wrap; width: 800px; overflow-x: auto;"
    selected_df = df[columns].sample(n=n)

    for _, row in selected_df.iterrows():
        for column in columns:
            display(
                HTML(f"<h2>{column}:</h2> <div style='{style}'>{row[column]}</div>")
            )
        display(HTML("<hr>"))


def visualize_eval_qa_summary_metrics(df: pd.DataFrame) -> None:
    """Plot main generated answers evaluation metrics"""

    categories = [
        "question_answering_quality",
        "question_answering_helpfulness",
        "fulfillment",
        "question_answering_relevance",
        "groundedness",
    ]

    fig = go.Figure()

    for category in categories:
        fig.add_trace(
            go.Bar(
                x=[category],
                y=[
                    rapid_eval_aggregated_metrics_df[f"{category}_score_mean"].values[0]
                ],
                error_y=dict(
                    type="data",
                    array=[
                        rapid_eval_aggregated_metrics_df[
                            f"{category}_score_std"
                        ].values[0]
                    ],
                    visible=True,
                ),
                name=category,
            )
        )

    fig.update_layout(
        title="RAG Q&A Rapid Eval Scores (mean and std)",
        xaxis_title="Metric",
        yaxis_title="Score",
        showlegend=False,
    )

    fig.show()

## Evaluate a RAG application for cooking healthy dishes

In this tutorial, you will use a RAG evaluation dataset from a Gen AI application to support chef inter in cooking healthy dishes.

### Load the RAG evaluation dataset

According to the [Rapid Eval API](https://cloud.google.com/vertex-ai/generative-ai/docs/model-reference/evaluation), the evaluation dataset to calculate the relevant RAG evaluation metrics should contain the following columns:

- instruction: in this case, it contains the cooking question
- context: in this scenario it contains some facts to answer the cooking question.
- prediction: the answer to the question grounded in the context.  

Check out the [Metrics bundles](https://cloud.google.com/vertex-ai/generative-ai/docs/models/rapid-evaluation#metric-bundles) documentation to know more about commonly associated metrics to question-answering task.

In [None]:
eval_dataset_df = pd.read_json(INPUT_EVALUATION_DATASET_URI, lines=True)

In [None]:
eval_dataset_df.head()

### Evaluate the RAG application using Dataflow ML Remote Inference with Rapid Eval API in Vertex AI Pipelines

To build a Vertex AI Pipeline that evaluates the RAG application using Dataflow ML Remote Inference with Rapid Eval API, you start by defining pipeline components. In this case, you have:

- `validate_eval_dataset` component which read the evaluation dataset and run some data quality checks.

- `eval_qa_batch_pipeline` component which uses the Kubeflow pipeline pattern of **pipeline as component** to combine the Vertex AI Pipelines `DataflowPythonJobOp` with some helpers components and provide more readable evaluation pipeline.

- `visualize_eval_qa_metrics` component which takes the Rapid Eval API results and returns some aggregated evaluation metrics.

#### Define the `validate_eval_dataset` component

The `validate_eval_dataset` component reads a JSONL file from a Google Cloud Storage URI, validates the data using a defined set of rules, and then outputs two Vertex Datasets: one for valid data and another for invalid data. It leverages multiprocessing and tqdm for efficient processing and includes detailed logging for tracking the validation process. The component exits with an error code if any invalid data is found, providing clear information about the errors.


In [None]:
@dsl.component(
    base_image="python:3.10",
    packages_to_install=[
        "multiprocess",
        "tqdm",
        "pandas",
        "google-cloud-aiplatform",
        "google_cloud_pipeline_components",
        "etils",
        "importlib_resources",
    ],
)
def validate_eval_dataset(
    input_eval_dataset_uri: str,
    valid_eval_dataset: Output[VertexDataset],
    invalid_eval_dataset: Output[VertexDataset],
) -> NamedTuple(
    "outputs", valid_eval_dataset_file_uri=str, invalid_eval_dataset_file_uri=str
):
    """Validate the input dataset."""

    import logging
    import os

    import multiprocess as mp
    import pandas as pd
    from tqdm.auto import tqdm

    def validate_row(row_tuple: tuple[int, pd.Series]) -> int | None:
        """Validates a single row of the DataFrame."""
        index, row = row_tuple
        if row.isnull().any() or (row == "").any():
            return index

        if not (
            isinstance(row["instruction"], str)
            and isinstance(row["context"], str)
            and isinstance(row["prediction"], str)
        ):
            return index

        return None

    def validate_dataframe(df: pd.DataFrame, num_processes: int) -> list[int]:
        """Validates a DataFrame using parallel processing and returns invalid row indices."""

        with mp.Pool(processes=num_processes) as pool:
            results = list(
                tqdm(
                    pool.imap(validate_row, df.iterrows()),
                    total=len(df),
                    desc="Validating DataFrame",
                )
            )
        invalid_indices = [index for index in results if index is not None]
        return invalid_indices

    # Set up logging
    logging.basicConfig(level=logging.INFO)

    # Determine the maximum number of processes
    max_processes = os.cpu_count()

    # Read the dataset
    logging.info(f"Reading dataset from {input_eval_dataset_uri}")
    input_dataset_path = input_eval_dataset_uri.replace("gs://", "/gcs/")
    eval_df = pd.read_json(input_dataset_path, lines=True)

    # Validate the dataset
    logging.info("Validating dataset")
    invalid_indices = validate_dataframe(eval_df, num_processes=max_processes)

    # Save valid dataset
    invalid_eval_dataset_file_path = invalid_eval_dataset.path + ".jsonl"
    valid_eval_dataset_file_path = valid_eval_dataset.path + ".jsonl"

    if invalid_indices:
        logging.error(f"DataFrame is invalid! Invalid row indices: {invalid_indices}.")
        logging.info(f"Saving invalid rows to {invalid_eval_dataset_file_path}")
        invalid_df = eval_df.iloc[invalid_indices]
        invalid_df.to_json(invalid_eval_dataset_file_path, orient="records", lines=True)
        logging.info(f"Saving only valid rows to {valid_eval_dataset_file_path}")
        valid_eval_df = eval_df.drop(index=invalid_indices)
        valid_eval_df.to_json(
            valid_eval_dataset_file_path, orient="records", lines=True
        )

    else:
        # Log the valid dataset
        logging.info("DataFrame is valid!")
        logging.info(f"Saving valid rows to {valid_eval_dataset_file_path}")
        eval_df.to_json(valid_eval_dataset_file_path, orient="records", lines=True)

    valid_eval_dataset.uri = valid_eval_dataset.uri + ".jsonl"
    invalid_eval_dataset.uri = invalid_eval_dataset.uri + ".jsonl"
    component_outputs = NamedTuple(
        "outputs", valid_eval_dataset_file_uri=str, invalid_eval_dataset_file_uri=str
    )
    return component_outputs(valid_eval_dataset.uri, invalid_eval_dataset.uri)

#### Define the `eval_qa_batch_pipeline` component

The `eval_qa_batch_pipeline` component uses `DataflowPythonJobOp` to calculate RAG Q&A evaluation metrics using Rapid Eval API. The `DataflowPythonJobOp` operator is used within Vertex AI Pipelines to process data using Apache Beam. It submits Python-based Beam jobs to Dataflow for execution. The Dataflow Runner handles code execution, uploading it and its dependencies to Cloud Storage before creating a job that runs your Beam pipeline on Dataflow.

The `DataflowPythonJobOp` component takes the following parameters:

- `project_id`: The project ID.
- `location`: The region.
- `python_module_path`: The Cloud Storage location of the Apache Beam pipeline to run RAG Q&A evaluation task.
- `temp_location`: The Cloud Storage temporary file workspace for the Apache Beam pipeline.
- `requirements_file_path`: The required Python modules to install.
- `args`: The arguments to pass to the Apache Beam pipeline.

Learn more about [Google Cloud Pipeline Component for Dataflow.](https://google-cloud-pipeline-components.readthedocs.io/en/google-cloud-pipeline-components-0.2.0/google_cloud_pipeline_components.experimental.dataflow.html)

##### Write the Apache Beam pipeline module

First, you write a Python script that utilizes Apache Beam to process a JSONL file containing model predictions and evaluate them using the Rapid Eval API. The script takes input and output file paths, a desired metric to evaluate, and optional batch size as command-line arguments. It then reads the input data, prepares metric requests, and sends them to the Rapid Eval API for evaluation. The script handles multiple metrics simultaneously, grouping the results by a unique ID, and writes the final evaluations to the specified output JSONL file.

In [None]:
custom_inference_module = '''

from __future__ import absolute_import

# General libraries
import argparse
import json
import hashlib
from etils import epath
from pathlib import Path
import pandas as pd
from typing import List, Dict, Tuple
from google import auth
from google.auth.transport import requests as google_auth_requests
from google.api_core import exceptions as google_exceptions
from requests import exceptions
import logging
import backoff

# Apache Beam libraries
import apache_beam as beam
from apache_beam.io import ReadFromText, WriteToText
from apache_beam.options.pipeline_options import PipelineOptions, SetupOptions
from apache_beam.ml.inference.base import ModelHandler, RunInference

# Library settings
logging.getLogger().setLevel(logging.INFO)

# ----------------------------------------------------------------------------
# Helper Functions
# ----------------------------------------------------------------------------

def parse_element(element: str) -> Dict:
  """
  Parse a JSON string into a dictionary.
  """
  return json.loads(element)

def unparse_element(element: Dict) -> str:
  """
  Un-parse a dictionary into a JSON string.
  """
  return json.dumps(element)

def generate_id(element: Dict) -> Tuple:
  """
  Generate a unique ID based on the values in string columns.
  """

  instruction = element.get("instruction", "")
  context = element.get("context", "")
  prediction = element.get("prediction", "")
  combined_string = ""

  if instruction:
    combined_string = instruction + context + prediction
  else:
    combined_string = context + prediction

  hash_object = hashlib.sha256(combined_string.encode())
  id = hash_object.hexdigest()
  return (id, element)

def get_metric_request(element: Tuple, metric_name: str) -> Tuple:
    """
    Format the Rapid Eval metric request based on the given metric name.
    Args:
        element: A JSON string containing the instruction, context, and prediction.
        metric_name: The name of the Rapid Eval metric to evaluate.

    Returns:
        A dictionary formatted as a Rapid Eval metric request.
    """

    id, element = element

    if metric_name == "groundedness":
      input_data = {
            f"{metric_name}_input": {
              "instance": {
                  "context": element.get("context"),
                  "prediction": element.get("prediction")
              },
              "metric_spec": {}
              }
            }
    elif metric_name == "fulfillment":
      input_data = {
            f"{metric_name}_input": {
              "instance": {
                  "instruction": element.get("instruction"),
                  "prediction": element.get("prediction")
              },
              "metric_spec": {}
              }
            }
    else:
      input_data = {
            f"{metric_name}_input": {
              "instance": {
                  "instruction": element.get("instruction"),
                  "context": element.get("context"),
                  "prediction": element.get("prediction")
              },
              "metric_spec": {}
              }
      }

    return id, input_data


def get_evaluation_output(element: Tuple) -> Dict:
    """Parse Rapid Eval output data."""

    _, evaluations = element

    result = {}

    for _, metric_data in evaluations.items():
      result['instruction'] = metric_data[0].get('instruction', '')
      result['context'] = metric_data[0].get('context', '')
      result['prediction'] = metric_data[0].get('prediction', '')
      if all(result != '' for v in result.values()):
        break

    for key, metric_data in evaluations.items():
      for sub_key, sub_value in metric_data[0].items():
          if sub_key not in ["instruction", "context", "prediction"]:
              result[f"{key}_{sub_key}"] = sub_value

    return result

# ----------------------------------------------------------------------------
# Custom Model Handler for RapidEval API
# ----------------------------------------------------------------------------

class RapidEvalAPIModelHandler(ModelHandler):
  """DoFn that accepts an input text, format it as Rapid Eval API request
  and sends that it to the API for remote inference"""

  def __init__(self, project, region, base_uri) -> None:
    """Initialize the model handler."""
    self.creds, _ = auth.default(scopes=["https://www.googleapis.com/auth/cloud-platform"])
    self.uri = base_uri.format(region, project, region)

  def load_model(self):
    """Create an authorized session for API requests."""
    return google_auth_requests.AuthorizedSession(self.creds)

  def run_inference(self, batch, model, inference):
    """Send the request to the Rapid Eval API and extract the relevant result."""

    @backoff.on_exception(backoff.expo,
                          exceptions.HTTPError,
                          max_tries=5, max_time=320)
    def send_request(self, element):
      """Sends a request to the Rapid Eval API with exponential backoff."""
      response = model.post(self.uri, json=element)
      response.raise_for_status()
      return response

    evaluations = []
    try:
      for batch_item in batch:
        id, element = batch_item
        instance = element[next(iter(element))].get('instance', '')
        response = send_request(self, element)
        result = response.json()
        result_key = next(iter(result))
        output_data = {**instance, **result[result_key]}
        evaluations.append((id, output_data))
      return evaluations

    except (google_exceptions.GoogleAPICallError, KeyError, json.JSONDecodeError) as e:
      logging.error(f"Error processing batch: {e}")
      return evaluations

# ----------------------------------------------------------------------------
# Main Apache Beam Pipeline
# ----------------------------------------------------------------------------

def run(argv=None):

    # ----------------------------------------------------------------------------
    # Parse command line arguments and set up pipeline options
    # ----------------------------------------------------------------------------

    parser = argparse.ArgumentParser()
    parser.add_argument("--input", dest="input", required=True, help="Input JSONL file to process.")
    parser.add_argument("--output", dest="output", required=True, help="Output JSONL file to write results to.")

    known_args, pipeline_args = parser.parse_known_args(argv)

    pipeline_options = PipelineOptions(pipeline_args)
    project = pipeline_options.get_all_options()['project']
    region = pipeline_options.get_all_options()['region']

    base_uri = "https://{}-aiplatform.googleapis.com/v1beta1/projects/{}/locations/{}:evaluateInstances"
    pipeline_options.view_as(SetupOptions).save_main_session = True

    # ----------------------------------------------------------------------------
    # Run Apache Beam pipeline
    # ----------------------------------------------------------------------------

    with beam.Pipeline(options=pipeline_options) as p:

      input_data = (
            p
            | "ReadRecords" >> ReadFromText(known_args.input)
            | "ParseInputJSON" >> beam.Map(parse_element)
            | "AddKey" >> beam.Map(generate_id)
          )

      question_answering_quality_evaluations = (
        input_data
        | "PrepareQAQualityInput" >> beam.Map(get_metric_request, metric_name='question_answering_quality')
        | "RunQAQualityEvaluation" >> RunInference(model_handler=RapidEvalAPIModelHandler(project, region, base_uri))
      )

      question_answering_helpfulness_evaluations = (
        input_data
        | "PrepareQAHelpfulnessInput" >> beam.Map(get_metric_request, metric_name='question_answering_helpfulness')
        | "RunQAHelpfulnessEvaluation" >> RunInference(model_handler=RapidEvalAPIModelHandler(project, region, base_uri))
      )

      question_answering_relevance_evaluations = (
          input_data
          | "PrepareQARelvanceInput" >> beam.Map(get_metric_request, metric_name='question_answering_relevance')
          | "RunQARelvanceEvaluation" >> RunInference(model_handler=RapidEvalAPIModelHandler(project, region, base_uri))
      )

      groundedness_evaluations = (
          input_data
          | "PrepareGroundednessInput" >> beam.Map(get_metric_request, metric_name='groundedness')
          | "RunGroundednessEvaluation" >> RunInference(model_handler=RapidEvalAPIModelHandler(project, region, base_uri))
      )

      fulfillment_evaluations = (
          input_data
          | "PrepareFulfillmentInput" >> beam.Map(get_metric_request, metric_name='fulfillment')
          | "RunFulfillmentEvaluation" >> RunInference(model_handler=RapidEvalAPIModelHandler(project, region, base_uri))
      )

      results = {
          "question_answering_quality": question_answering_quality_evaluations,
          "question_answering_helpfulness": question_answering_helpfulness_evaluations,
          "question_answering_relevance": question_answering_relevance_evaluations,
          "groundedness": groundedness_evaluations,
          "fulfillment": fulfillment_evaluations
      }

      output_data = (
                results
                | 'GroupEvalbyKey' >> beam.CoGroupByKey()
                | 'PrepareEvaluations' >> beam.Map(get_evaluation_output)
                | 'UnparseOutputData' >> beam.Map(unparse_element)
                )
      output_data | "WriteEvaluations" >> WriteToText(known_args.output, file_name_suffix='.jsonl')

if __name__ == "__main__":
  run()
'''

with epath.Path(SRC_URI / "main.py").open("w") as f:
    f.write(custom_inference_module)

##### Write the requirements

Next, create the `requirements.txt` file to specify Python modules that are required.

In [None]:
requirements_file = """
future
importlib_resources
etils
backoff
apache-beam
google-cloud-aiplatform
google-auth>=2.26.1
"""

with epath.Path(SRC_URI / "requirements.txt").open("w") as f:
    f.write(requirements_file)

##### Write the setup.py

Next, create the `setup.py` file to specify Python modules that are required to be installed for executing the Dataflow workers.


In [None]:
setup_module = """
import setuptools

REQUIRED_PACKAGES = [
    'future',
    'importlib_resources',
    'etils',
    'backoff',
    'google-cloud-aiplatform',
    'google-auth>=2.26.1'
]
PACKAGE_NAME = 'eval_qa_rapid_api'
PACKAGE_VERSION = '0.0.1'
setuptools.setup(
    name=PACKAGE_NAME,
    version=PACKAGE_VERSION,
    description='Demo for evaluating question answering using Rapid Eval API',
    install_requires=REQUIRED_PACKAGES,
    author="author@google.com",
    packages=setuptools.find_packages()
)
"""

with epath.Path(SRC_URI / "setup.py").open("w") as f:
    f.write(setup_module)

##### Define the evaluation pipeline component

Finally, you assemble the `eval_qa_batch_pipeline` pipeline component. The pipeline prepares Dataflow ML job arguments (`prepare_args_op` component), runs the Dataflow ML job, waits for the job to complete (`DataflowPythonJobOp`and  `WaitGcpResourcesOp` components), and returns the output evaluation dataset file URI (`get_output_eval_file_uri_op` component)

In [None]:
@dsl.component(base_image="python:3.10")
def prepare_arguments(
    valid_eval_dataset_file_uri: str, output_eval_dataset_uri: str, args: str
) -> NamedTuple("outputs", args=list):
    """Parse command line arguments."""
    import json

    args = json.loads(args)
    args.insert(0, "--input")
    args.insert(1, valid_eval_dataset_file_uri)
    args.insert(2, "--output")
    args.insert(3, output_eval_dataset_uri)
    component_outputs = NamedTuple("outputs", args=list)
    return component_outputs(args)


@dsl.component(
    base_image="python:3.10",
    packages_to_install=[
        "google_cloud_pipeline_components",
        "etils",
        "importlib_resources",
    ],
)
def get_output_eval_file_uri(
    output_eval_dataset_uri: str, output_eval_dataset: Output[VertexDataset]
) -> NamedTuple("outputs", output_eval_dataset_file_uri=str):
    """Get the output file path."""
    import logging

    from etils import epath

    # Set up logging
    logging.basicConfig(level=logging.INFO)

    # Read the dataset
    logging.info(f"Reading dataset from {output_eval_dataset_uri}")
    output_eval_dataset_path = output_eval_dataset_uri.replace("gs://", "/gcs/")
    output_eval_dataset_file_paths = [
        str(p)
        for p in epath.Path(epath.Path(output_eval_dataset_path).parents[0]).glob(
            "*.jsonl"
        )
    ]

    # Prepare the metrics file
    output_eval_dataset_file_path = output_eval_dataset.path + ".jsonl"

    logging.info(f"Writing metrics file at {output_eval_dataset_file_path}")
    with open(output_eval_dataset_file_path, "w") as outfile:
        for output_eval_dataset_file in output_eval_dataset_file_paths:
            with open(output_eval_dataset_file) as interfile:
                for line in interfile:
                    outfile.write(line)

    output_eval_dataset.uri = output_eval_dataset.uri + ".jsonl"
    component_outputs = NamedTuple("outputs", output_eval_dataset_file_uri=str)
    return component_outputs(output_eval_dataset.uri)


@dsl.pipeline
def eval_qa_batch_pipeline(
    valid_eval_dataset_file_uri: str,
    output_eval_dataset_uri: str,
    args: str,
    requirements_file_path: str,
    python_file_path: str,
    temp_location: str,
    project_id: str,
    location: str,
    staging_dir: str,
) -> str:

    # Prepare the Dataflow ML job arguments
    prepare_args_op = prepare_arguments(
        valid_eval_dataset_file_uri=valid_eval_dataset_file_uri,
        output_eval_dataset_uri=output_eval_dataset_uri,
        args=args,
    ).set_display_name("Prepare arguments")

    # Run the Dataflow ML job
    dataflow_python_op = DataflowPythonJobOp(
        project=project_id,
        location=location,
        python_module_path=python_file_path,
        temp_location=temp_location,
        requirements_file_path=requirements_file_path,
        args=prepare_args_op.outputs["args"],
    ).set_display_name("Prepare Dataflow ML Evaluation Job")

    wait_op = (
        WaitGcpResourcesOp(gcp_resources=dataflow_python_op.outputs["gcp_resources"])
        .set_display_name("Run Dataflow ML Evaluation Job")
        .after(dataflow_python_op)
    )

    # Get the output metrics uri
    get_output_eval_file_uri_op = (
        get_output_eval_file_uri(output_eval_dataset_uri=output_eval_dataset_uri)
        .set_display_name("Get evaluation file")
        .after(wait_op)
    )

    return get_output_eval_file_uri_op.outputs["output_eval_dataset_file_uri"]

#### Define the `visualize_eval_qa_metrics` component

The `visualize_eval_qa_metrics` component calculates the mean and standard deviation for each score column in the Rapid Eval API resulting dataset using multiprocessing for efficiency. The component logs these metrics and saves them into a JSONL file.

In [None]:
@dsl.component(
    base_image="python:3.10",
    packages_to_install=[
        "multiprocess",
        "tqdm",
        "numpy",
        "pandas",
        "google-cloud-aiplatform",
        "google_cloud_pipeline_components",
        "etils",
        "importlib_resources",
    ],
)
def visualize_eval_qa_metrics(
    output_eval_dataset_file_uri: str, output_eval_summary_metrics: Output[Metrics]
) -> NamedTuple("outputs", output_eval_summary_metrics_file_uri=str):
    """Visualize the evaluation metrics."""

    import json
    import logging
    import os

    import multiprocess as mp
    import numpy as np
    import pandas as pd
    from tqdm.auto import tqdm

    def calculate_stats(score_column: pd.Series) -> tuple[float, float]:
        """Calculates mean and standard deviation for a given score column."""
        return np.mean(score_column), np.std(score_column)

    def get_metrics(
        output_eval_dataset_file_uri: str, num_processes: int
    ) -> dict[str, float]:
        """Get metrics with mean and standard deviation for score columns."""
        output_eval_dataset_file_path = output_eval_dataset_file_uri.replace(
            "gs://", "/gcs/"
        )
        eval_result_df = pd.read_json(output_eval_dataset_file_path, lines=True)
        score_columns = [
            col for col in eval_result_df.columns if col.endswith("_score")
        ]

        with mp.Pool(processes=num_processes) as pool:
            eval_report_results = list(
                tqdm(
                    pool.imap(
                        calculate_stats, [eval_result_df[col] for col in score_columns]
                    ),
                    total=len(score_columns),
                    desc="Calculating statistics",
                )
            )

        metrics = {}
        for col, (mean, std_dev) in zip(score_columns, eval_report_results):
            metrics[f"{col}_mean"] = round(float(mean), 3)
            metrics[f"{col}_std"] = round(float(std_dev), 3)
        return metrics

    # Set up logging
    logging.basicConfig(level=logging.INFO)

    # Determine the maximum number of processes
    max_processes = os.cpu_count()

    # Generate metadata
    logging.info("Generating metadata")
    metrics = get_metrics(output_eval_dataset_file_uri, num_processes=max_processes)

    # Log metrics
    logging.info("Logging metrics")
    for key, value in metrics.items():
        # Assuming output_table_metrics is a custom logging object
        output_eval_summary_metrics.log_metric(key, value)

    # Save aggregated metrics
    output_eval_summary_metrics_file_path = output_eval_summary_metrics.path + ".jsonl"
    logging.info(f"Writing metrics file at {output_eval_summary_metrics_file_path}")
    with open(output_eval_summary_metrics_file_path, "w") as outfile:
        json.dump(metrics, outfile)

    output_eval_summary_metrics.uri = output_eval_summary_metrics.uri + ".jsonl"
    component_outputs = NamedTuple("outputs", output_eval_summary_metrics_file_uri=str)
    return component_outputs(output_eval_summary_metrics.uri)

#### Define your workflow using Kubeflow Pipelines DSL package

You assemble the pipeline using the defined components according to the evaluation workflow.

In [None]:
@dsl.pipeline(
    name="eval-rag-batch-pipeline",
    description="Evaluating question answering using Rapid Eval API",
)
def pipeline(
    input_eval_dataset_uri: str,
    output_eval_dataset_uri: str,
    args: str,
    requirements_file_path: str,
    python_file_path: str,
    temp_location: str,
    project_id: str,
    location: str,
    staging_dir: str,
):

    # Validate the input dataset
    validate_eval_dataset_op = validate_eval_dataset(
        input_eval_dataset_uri=input_eval_dataset_uri
    ).set_display_name("Validate RAG Eval dataset")

    # Run the evaluation subpipeline
    eval_qa_batch_pipeline_op = (
        eval_qa_batch_pipeline(
            valid_eval_dataset_file_uri=validate_eval_dataset_op.outputs[
                "valid_eval_dataset_file_uri"
            ],  # validate_eval_dataset_op.outputs['input_dataset_uri'],
            output_eval_dataset_uri=output_eval_dataset_uri,
            args=args,
            requirements_file_path=requirements_file_path,
            python_file_path=python_file_path,
            temp_location=temp_location,
            project_id=project_id,
            location=location,
            staging_dir=staging_dir,
        )
        .set_display_name("RAG Q&A Evaluation")
        .after(validate_eval_dataset_op)
    )

    # Read the dataset and print some aggregated metrics (average and standard dev)
    _ = (
        visualize_eval_qa_metrics(
            output_eval_dataset_file_uri=eval_qa_batch_pipeline_op.output
        )
        .set_display_name("Visualize RAG Eval metrics")
        .after(eval_qa_batch_pipeline_op)
    )

#### Compile your pipeline into a YAML file

After the workflow of your pipeline is defined, you compile the pipeline into YAML format.

In [None]:
compiler.Compiler().compile(
    pipeline_func=pipeline, package_path=str(PIPELINE_PATH) + "/eval_pipeline.json"
)

#### Submit your pipeline run

After the workflow of your pipeline is compiled into the YAML format, you use the Vertex AI Python SDK to submit and run your pipeline.

The pipeline requires **~15 mins** to run.

In [None]:
pipeline_params = {
    "input_eval_dataset_uri": INPUT_EVALUATION_DATASET_URI,
    "output_eval_dataset_uri": OUTPUT_EVALUATION_DATASET_URI,
    "args": [
        "--runner",
        "DataflowRunner",
        "--setup_file",
        str(SRC_URI / "setup.py"),
        "--project",
        PROJECT_ID,
        "--region",
        REGION,
    ],
    "requirements_file_path": str(SRC_URI / "requirements.txt"),
    "python_file_path": str(SRC_URI / "main.py"),
    "temp_location": str(TMP_URI),
    "project_id": PROJECT_ID,
    "location": REGION,
    "staging_dir": str(PIPELINE_ROOT_URI),
}


pipeline_job = aiplatform.PipelineJob(
    display_name="evaluate_rag_batch_eval",
    template_path=str(PIPELINE_PATH / "eval_pipeline.json"),
    parameter_values=pipeline_params,
    pipeline_root=str(PIPELINE_ROOT_URI),
    enable_caching=False,
)

In [None]:
pipeline_job.run()

### Get evaluation results

After the pipeline run is successfully completed, you can both retrieve the RAG eval metrics at row and aggregated levels.

#### Row-level metrics


In [None]:
for details in pipeline_job.task_details:
    if details.task_name == "get-output-eval-file-uri":
        break

# row-level-metrics
rapid_eval_row_metrics_uri = details.outputs["output_eval_dataset"].artifacts[0].uri
rapid_eval_row_metrics_df = pd.read_json(rapid_eval_row_metrics_uri, lines=True)
print_content(rapid_eval_row_metrics_df, columns=rapid_eval_row_metrics_df.columns, n=3)

#### Aggregate metrics

In [None]:
for details in pipeline_job.task_details:
    if details.task_name == "visualize-eval-qa-metrics":
        break

# aggregated-metrics
rapid_eval_aggregated_metrics_uri = (
    details.outputs["output_eval_summary_metrics"].artifacts[0].uri
)
rapid_eval_aggregated_metrics_df = pd.read_json(
    rapid_eval_aggregated_metrics_uri, lines=True
)
visualize_eval_qa_summary_metrics(rapid_eval_aggregated_metrics_df)

## Cleaning up

In [None]:
delete_bucket = False
delete_pipeline = False
delete_tutorial_dir = False

if delete_bucket:
    ! gsutil -m rm -r $BUCKET_URI

if delete_pipeline:
    pipeline_list = aiplatform.PipelineJob.list()
    for pipeline in pipeline_list:
        if pipeline.display_name == "evaluate_rag_batch_eval":
            pipeline.delete()

if delete_tutorial_dir:
    import shutil

    shutil.rmtree(str(TUTORIAL_PATH))