In [None]:
# Copyright 2023 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

# FraudFinder - BigQuery ML - Model training pipeline formalization

<table align="left">
  <td>
    <a href="https://console.cloud.google.com/ai-platform/notebooks/deploy-notebook?download_url=https://github.com/GoogleCloudPlatform/fraudfinder/raw/main/bqml/05_model_training_pipeline_formalization.ipynb">
       <img src="https://www.gstatic.com/cloud/images/navigation/vertex-ai.svg" alt="Google Cloud Notebooks">Open in Cloud Notebook
    </a>
  </td> 
  <td>
    <a href="https://colab.research.google.com/github/GoogleCloudPlatform/fraudfinder/blob/main/bqml/05_model_training_pipeline_formalization.ipynb">
      <img src="https://cloud.google.com/ml-engine/images/colab-logo-32px.png" alt="Colab logo"> Open in Colab
    </a>
  </td>
  <td>
    <a href="https://github.com/GoogleCloudPlatform/fraudfinder/blob/main/bqml/05_model_training_pipeline_formalization.ipynb">
        <img src="https://cloud.google.com/ml-engine/images/github-logo-32px.png" alt="GitHub logo">
      View on GitHub
    </a>
  </td>
</table>

## Overview

[FraudFinder](https://github.com/googlecloudplatform/fraudfinder) is a series of labs on how to build a real-time fraud detection system on Google Cloud. Throughout the Fraudfinder labs, you will learn how to read historical bank transaction data stored in data warehouse, read from a live stream of new transactions, perform exploratory data analysis (EDA), do feature engineering, ingest features into a feature store, train a model using feature store, register your model in a model registry, evaluate your model, deploy your model to an endpoint, do real-time inference on your model with feature store, and monitor your model.


### Objective

In this tutorial, you will learn how to:

- Train a Logistic Regression model using BigQuery ML (BQML)
- Evaluate the model BQML model
- Run an evaluation job 
- Register the model on Vertex AI Model Registry
- Create a Vertex AI Endpoint and upload the BQML to the Endpoint 

This tutorial uses the following Google Cloud services:
- [BigQuery](https://cloud.google.com/bigquery/)
- [BigQueryML](https://cloud.google.com/bigquery-ml/)
- [Vertex AI](https://cloud.google.com/vertex-ai/)

The steps performed include:
- Build a custom component for the Pipeline. 
- Using Kubeflow Pipeline (KFP) DSL to build an end-to-end pipeline
- Compile the Pipeline
- Submit and execute the pipeline

### Costs 
This tutorial uses billable components of Google Cloud:
* BigQuery
* BigQuery ML
* Vertex AI
* Google Cloud Storage

Learn about [BigQuery Pricing](https://cloud.google.com/bigquery/pricing), [BigQuery ML pricing](https://cloud.google.com/bigquery-ml/pricing), [Vertex AI pricing](https://cloud.google.com/vertex-ai/pricing), and use the [Pricing Calculator](https://cloud.google.com/products/calculator/)

### Load configuration settings from the setup notebook

First you will need to set the constants used in this notebook and load the config settings from the `00_environment_setup.ipynb notebook`.

In [None]:
GCP_PROJECTS = !gcloud config get-value project
PROJECT_ID = GCP_PROJECTS[0]
BUCKET_NAME = f"{PROJECT_ID}-fraudfinder"
config = !gsutil cat gs://{BUCKET_NAME}/config/notebook_env.py
print(config.n)
exec(config.n)

### Import libraries

In [None]:
import json
from typing import NamedTuple, Optional
from datetime import datetime, timedelta

import google.cloud.aiplatform as vertex_ai
# kfp and cloud components
import kfp
from google.cloud import bigquery
from google_cloud_pipeline_components.types import artifact_types
from google_cloud_pipeline_components.v1.bigquery import (
    BigqueryCreateModelJobOp, BigqueryEvaluateModelJobOp,
    BigqueryExplainPredictModelJobOp, BigqueryExportModelJobOp,
    BigqueryPredictModelJobOp, BigqueryQueryJobOp)
from google_cloud_pipeline_components.v1.endpoint import (EndpointCreateOp,
                                                          ModelDeployOp)
from kfp import dsl
from kfp.v2 import compiler
from kfp.v2.dsl import HTML, Artifact, Condition, Input, Output, component

### Define constants

In [None]:
# General
START_DATE_TRAIN = (datetime.today() - timedelta(days=1)).strftime("%Y-%m-%d")
END_DATE_TRAIN = (datetime.today() - timedelta(days=1)).strftime("%Y-%m-%d")
SERVING_FEATURE_IDS = {"customer": ["*"], "terminal": ["*"]}
READ_INSTANCES_TABLE = f"ground_truth_{END_DATE_TRAIN}"
READ_INSTANCES_URI = f"bq://{PROJECT_ID}.tx.{READ_INSTANCES_TABLE}"
BQ_TABLE_NAME = f"train_table_{END_DATE_TRAIN.replace('-', '')}"
TRAIN_TABLE_URI = f"bq://{PROJECT_ID}.tx.{BQ_TABLE_NAME}"
PIPELINE_ROOT = f"gs://{BUCKET_NAME}/pipeline_root/ff"
PIPELINE_DISPLAY_NAME = "bqml-pipeline-ff"
MODEL_NAME_PIPELINE = f"{MODEL_NAME}_pipeline"
PACKAGE_PATH = "bqml-pipeline-ff.json"

### Initialize Vertex AI SDK
Initialize the Vertex AI SDK for Python for your project and corresponding bucket.

In [None]:
vertex_ai.init(project=PROJECT_ID, staging_bucket=BUCKET_NAME)

### Set project folder
Set the path where we will store the Kubeflow Pipelines Component. 

In [None]:
KFP_COMPONENTS_PATH = "components"
! mkdir -m 777 -p {KFP_COMPONENTS_PATH}
! mkdir -m 777 -p {KFP_COMPONENTS_PATH}/custom_components

### Create Custom Components
In this notebook we will be using a mix of pre-built BigQuery ML components and custom components. The difference is:

* Pre-built components are official Google Cloud Pipeline Components ([GCPC](https://cloud.google.com/vertex-ai/docs/pipelines/gcpc-list)). The Google Cloud Pipeline Components (GCPC) SDK provides a set of prebuilt components that are production quality, consistent, performant, and easy to use in Vertex AI Pipelines. 
* The custom component, as you will build in the cell below, is typically a component authored by a data scientist or ML engineer. This means that you have more control over the code running in the component (container). In this case it's a [Python-function-based component](https://www.kubeflow.org/docs/components/pipelines/v1/sdk/python-function-components/). 

In the next two cells you will build two custom components:
* The first component will take the evaluation metrics of your model and return it so that we can store it as metadata. 
* The second component will deploy our model from the Vertex AI Model Registry into a Vertex AI Endpoint. 

The pre-built components provides the benefit of being easy to use, while custom components provide more flexibility beyond the capabilities of pre-built components. 

#### Build a custom component that will fetch the eval metric 
This custom component will retrieve the evaluation metric and it will be used downstream and stored as metadata. This way we can keep track of our model performance. In order to take a Python function and turn it into a component we will use the `@component` decorator. 

In [None]:
@component(
    base_image="python:3.8-slim",
    packages_to_install=["jinja2", "pandas", "matplotlib"],
    output_component_file=f"{KFP_COMPONENTS_PATH}/custom_components/build_bq_evaluate_metrics.yaml",
)
def get_model_evaluation_metrics(
    metrics_in: Input[Artifact],
) -> NamedTuple("Outputs", [("accuracy", float)]):
    """
    Get the accuracy from the metrics
    Args:
        metrics_in: metrics artifact
    Returns:
        accuracy: accuracy
    """

    import pandas as pd

    def get_column_names(header):
        """
        Helper function to get the column names from the metrics table.
        Args:
            header: header
        Returns:
            column_names: column names
        """
        header_clean = header.replace("_", " ")
        header_abbrev = "".join([h[0].upper() for h in header_clean.split()])
        header_prettied = f"{header_clean} ({header_abbrev})"
        return header_prettied

    # Extract rows and schema from metrics artifact
    rows = metrics_in.metadata["rows"]
    schema = metrics_in.metadata["schema"]

    # Convert into a tabular format
    columns = [metrics["name"] for metrics in schema["fields"] if "name" in metrics]
    records = []
    for row in rows:
        records.append([dl["v"] for dl in row["f"]])

    metrics = pd.DataFrame.from_records(records, columns=columns).astype(float).round(3)

    metrics = metrics.reset_index()

    # Create the HTML artifact used for the metrics
    pretty_columns = list(
        map(
            lambda h: get_column_names(h)
            if h != columns[0]
            else h.replace("_", " ").capitalize(),
            columns,
        )
    )

    # Create metrics dictionary for the model
    accuracy = round(float(metrics.accuracy), 3)
    component_outputs = NamedTuple("Outputs", [("accuracy", float)])

    return component_outputs(accuracy)

#### Custom component that will deploy our model from the Vertex AI Model Registry into a Vertex AI Endpoint. 
This custom component will take our BQML model that is registered on Vertex AI Model Registry and deploy it into our Vertex AI Endpoint. This will be the last step in our end-to-end pipeline. 

In [None]:
@component(
    base_image="python:3.8-slim",
    packages_to_install=["google-cloud-aiplatform"],
)
def upload_model_enpoint(
    project: str,
    location: str,
    bq_model_name: str,
):
    """
    Uploads the model to Vertex AI
    Args:
        project: Project ID
        location: Region
        bq_model_name: A fully-qualified model resource name or model ID.
          Example: "projects/123/locations/us-central1/models/456" or
          "456" when project and location are initialized or passed.
    Returns:
        None
    """
    from google.cloud import aiplatform as vertex_ai

    model = vertex_ai.Model(model_name='bqml_fraud_classifier_pipeline')

    endpoint = vertex_ai.Endpoint.list(order_by="update_time")
    endpoint = endpoint[-1]

    model.deploy(
        endpoint=endpoint,
        min_replica_count=1,
        max_replica_count=1,
    )

    model.wait()

    return

### Build and orchestrate a pipeline

Next we will build a pipeline that will execute and orchestrate the following steps. Building ML Pipelines that run on Vertex AI pipelines can be done in two different ways:

* [Tensorflow Extended DSL](https://www.tensorflow.org/tfx/tutorials#getting-started-tutorials)
* [Kubeflow Pipelines DSL](https://www.kubeflow.org/docs/components/pipelines/v1/introduction/)

Based on your preference you can choose between the two options, but this notebook will only focus on Kubeflow Pipelines. 

Below you can set the model accuracy threshold used for the condition. 

In [None]:
perf_threshold = 0.95

In the next cell below, you will put together all the components into a pipeline function. In this example you will use the KFP DSL to define your end-to-end pipeline. For this you will use the `@ds.pipeline` decorator.

In this example there is also a `with Condition` step  that will only execute if the threshold is met (i.e. if the model evaluation metric is at or above the threshold `perf_threshold`). 

In [None]:
@dsl.pipeline(
    name="bqml-pipeline-ff",
    description="Trains and deploys bqml model to detect fraud",
    pipeline_root=PIPELINE_ROOT,
)
def bqml_pipeline_ff(
    bq_table: str = BQ_TABLE_NAME,
    dataset: str = "tx",
    model: str = MODEL_NAME_PIPELINE,
    project: str = PROJECT_ID,
    region: str = REGION,
    endpoint_name: str = ENDPOINT_NAME,
):

    bq_model_op = BigqueryCreateModelJobOp(
        project=project,
        location=region,
        query=f"""CREATE OR REPLACE MODEL `tx.{MODEL_NAME_PIPELINE}` 
        OPTIONS (
            MODEL_TYPE='LOGISTIC_REG', 
            INPUT_LABEL_COLS=['tx_fraud'], 
            EARLY_STOP=TRUE,     
            model_registry='vertex_ai',
            vertex_ai_model_id='bqml_fraud_classifier_pipeline',
            vertex_ai_model_version_aliases=['logit', 'experimental']
        ) 
        AS SELECT * EXCEPT(timestamp, entity_type_customer, entity_type_terminal) FROM `tx.{BQ_TABLE_NAME}`""",
    )

    _ = BigqueryExplainPredictModelJobOp(
        project=project,
        location=region,
        table_name=f"{dataset}.{bq_table}",
        model=bq_model_op.outputs["model"],
    )

    bq_evaluate_model_op = BigqueryEvaluateModelJobOp(
        project=project, location=region, model=bq_model_op.outputs["model"]
    ).after(bq_model_op)

    get_evaluation_model_metrics_op = (
        get_model_evaluation_metrics(bq_evaluate_model_op.outputs["evaluation_metrics"])
        .after(bq_evaluate_model_op)
        .set_display_name("plot evaluation metrics")
    )

    # Check the model performance.
    with Condition(
        get_evaluation_model_metrics_op.outputs["accuracy"] > perf_threshold,
        name="accuracy is above threshold",
    ):

        endpoint_create_op = EndpointCreateOp(
            project=project, location=region, display_name=ENDPOINT_NAME
        )

        _ = upload_model_enpoint(
            project=project, location=region, bq_model_name=model
        ).after(endpoint_create_op)

### Submit Vertex AI Pipeline Job
Once you have authored your pipeline, to deploy it, you will first need to compile it into a JSON file, `bqml-pipeline-ff.json`, then upload the JSON file to Vertex AI Pipelines in order to submit it for execution. 

The first step is using the `compiler.Compiler()` to compile the pipeline `bqml_pipeline_ff()` into a JSON file. 

In [None]:
from kfp.v2 import compiler

compiler.Compiler().compile(pipeline_func=bqml_pipeline_ff, package_path=PACKAGE_PATH)

Now you can go ahead and submit a Vertex AI Pipeline job, using `vertex_ai.PipelineJob()`. The output of the next cell will give you a URL that will take you the Vertex AI Pipeline UI. There, you can monitor the progress of your pipeline run as it executes over the next several minutes. 

The execution of the pipeline will take some time, and your pipeline execution is completed once all of the steps are green. 

In [None]:
job = vertex_ai.PipelineJob(
    display_name=PIPELINE_DISPLAY_NAME,
    template_path=PACKAGE_PATH,
    pipeline_root=PIPELINE_ROOT,
    enable_caching=True,
)

print(job.run())

Below you can see the Vertex AI Pipeline execution you will visualize in the Cloud console.

<img src="https://github.com/GoogleCloudPlatform/fraudfinder/blob/main/misc/images/pipeline_run_example.png?raw=1">

### END

Now you can go to the next notebook `06_model_monitoring.ipynb`