In [None]:
# Copyright 2023 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

# Fraudfinder - ML Pipeline

<table align="left">
  <td>
    <a href="https://console.cloud.google.com/ai-platform/notebooks/deploy-notebook?download_url=https://github.com/GoogleCloudPlatform/fraudfinder/blob/main/vertex_ai/06_formalization.ipynb">
       <img src="https://www.gstatic.com/cloud/images/navigation/vertex-ai.svg" alt="Google Cloud Notebooks">Open in Cloud Notebook
    </a>
  </td> 
  <td>
    <a href="https://colab.research.google.com/github/GoogleCloudPlatform/fraudfinder/blob/main/vertex_ai/06_formalization.ipynb">
      <img src="https://cloud.google.com/ml-engine/images/colab-logo-32px.png" alt="Colab logo"> Open in Colab
    </a>
  </td>
  <td>
    <a href="https://github.com/GoogleCloudPlatform/fraudfinder/blob/main/vertex_ai/06_formalization.ipynb">
        <img src="https://cloud.google.com/ml-engine/images/github-logo-32px.png" alt="GitHub logo">
      View on GitHub
    </a>
  </td>
</table>

## Overview

[Fraudfinder](https://github.com/googlecloudplatform/fraudfinder) is a series of labs on how to build a real-time fraud detection system on Google Cloud. Throughout the Fraudfinder labs, you will learn how to read historical bank transaction data stored in data warehouse, read from a live stream of new transactions, perform exploratory data analysis (EDA), do feature engineering, ingest features into a feature store, train a model using feature store, register your model in a model registry, evaluate your model, deploy your model to an endpoint, do real-time inference on your model with feature store, and monitor your model.

### Objective

This notebook shows how to use Feature Store, Pipelines and Model Monitoring for building an end-to-end demo using both components defined in `google_cloud_pipeline_components` and custom components. 

This lab uses the following Google Cloud services and resources:

- [Vertex AI](https://cloud.google.com/vertex-ai/)
- [BigQuery](https://cloud.google.com/bigquery/)

Steps performed in this notebook:

* Create a Vetex AI Pipeline to orchestrate and automate the ML workflow

### Costs

This tutorial uses billable components of Google Cloud:

* Vertex AI
* BigQuery

Learn about [Vertex AI
pricing](https://cloud.google.com/vertex-ai/pricing), [BigQuery pricing](https://cloud.google.com/bigquery/pricing) and use the [Pricing
Calculator](https://cloud.google.com/products/calculator/)
to generate a cost estimate based on your projected usage.

### Load configuration settings from the setup notebook

Set the constants used in this notebook and load the config settings from the `00_environment_setup.ipynb` notebook.

In [None]:
GCP_PROJECTS = !gcloud config get-value project
PROJECT_ID = GCP_PROJECTS[0]
BUCKET_NAME = f"{PROJECT_ID}-fraudfinder"
config = !gsutil cat gs://{BUCKET_NAME}/config/notebook_env.py
print(config.n)
exec(config.n)

### Import libraries and define constants

#### Libraries
Next you will import the libraries needed for this notebook. 

Note that currently this notebook uses KFP SDK v1, whereas the environment includes KFP v2. As an interim solution, we will downlevel KFP and the Google Cloud Pipeline Components in order to use the v1 code here as-is. See the [KFP migration guide](https://www.kubeflow.org/docs/components/pipelines/v2/migration/) for more details of moving from v1 to v2. 

In [None]:
! pip install --upgrade 'google-cloud-pipeline-components==0.3.0'

In [None]:
# General
import os
import sys
import random
from datetime import datetime, timedelta
import json

# Vertex Pipelines
from typing import NamedTuple
import kfp
from kfp.v2 import dsl
from kfp.v2.dsl import (
    Artifact,
    Dataset,
    Input,
    InputPath,
    Model,
    Output,
    OutputPath,
    Metrics,
    ClassificationMetrics,
    Condition,
    component,
)
from kfp.v2 import compiler

from google.cloud import aiplatform as vertex_ai
from google_cloud_pipeline_components import aiplatform as vertex_ai_components
from kfp.v2.google.client import AIPlatformClient as VertexAIClient

In [None]:
print("kfp version:", kfp.__version__)

#### Variables

In [None]:
# Components variables
BASE_IMAGE = "python:3.7"
COMPONENTS_DIR = os.path.join(os.curdir, "pipelines", "components")
INGEST_FEATURE_STORE = f"{COMPONENTS_DIR}/ingest_feature_store_{ID}.yaml"
EVALUATE = f"{COMPONENTS_DIR}/evaluate_{ID}.yaml"

# Pipeline variables
PIPELINE_NAME = f"fraud-finder-xgb-pipeline-{ID}"
PIPELINE_DIR = os.path.join(os.curdir, "pipelines")
PIPELINE_ROOT = f"gs://{BUCKET_NAME}/pipelines"
PIPELINE_PACKAGE_PATH = f"{PIPELINE_DIR}/pipeline_{ID}.json"

# Feature Store component variables
BQ_DATASET = "tx"
READ_INSTANCES_TABLE = f"ground_truth_{ID}"
READ_INSTANCES_URI = f"bq://{PROJECT_ID}.{BQ_DATASET}.{READ_INSTANCES_TABLE}"

# Dataset component variables
DATASET_NAME = f"fraud_finder_dataset_{ID}"

# Training component variables
JOB_NAME = f"fraudfinder-train-xgb-{ID}"
MODEL_NAME = f"{MODEL_NAME}_xgb_pipeline_{ID}"
CONTAINER_URI = "us-docker.pkg.dev/vertex-ai/training/xgboost-cpu.1-1:latest"
MODEL_SERVING_IMAGE_URI = (
    "us-docker.pkg.dev/vertex-ai/prediction/xgboost-cpu.1-1:latest"
)
ARGS = json.dumps(["--bucket", f"gs://{BUCKET_NAME}"])
IMAGE_REPOSITORY = f"fraudfinder-{ID}"
IMAGE_NAME = "dask-xgb-classificator"
IMAGE_TAG = "v1"
IMAGE_URI = f"us-central1-docker.pkg.dev/{PROJECT_ID}/{IMAGE_REPOSITORY}/{IMAGE_NAME}:{IMAGE_TAG}"  # TODO: get it from config

# Evaluation component variables
METRICS_URI = f"gs://{BUCKET_NAME}/deliverables/metrics.json"
AVG_PR_THRESHOLD = 0.2
AVG_PR_CONDITION = "avg_pr_condition"

# Endpoint variables
ENDPOINT_NAME = f"{ENDPOINT_NAME}_xgb_pipeline_{ID}"

#### Initialize the Vertex AI SDK
Initialize the Vertex AI SDK for Python for your project and corresponding bucket.

In [None]:
# Vertex AI SDK
vertex_ai.init(project=PROJECT_ID, location=REGION, staging_bucket=BUCKET_NAME)

In [None]:
!gsutil ubla set on gs://{BUCKET_NAME}

#### Create directories 
Create a directory for you pipeline and pipeline components. 

In [None]:
!mkdir -p -m 777 $PIPELINE_DIR $COMPONENTS_DIR

### Create a end-to-end Pipeline and execute it on Vertex AI Pipelines.

We will build a pipeline that you will execute using [Vertex AI Pipelines](https://cloud.google.com/vertex-ai/docs/pipelines/introduction). Vertex AI Pipelines helps you to automate, monitor, and govern your ML systems by orchestrating your ML workflow in a serverless manner, and storing your workflow's artifacts using Vertex ML Metadata. Authoring ML Pipelines that run on Vertex AI pipelines can be done in two different ways:

* [Tensorflow Extended](https://www.tensorflow.org/tfx/guide)
* [Kubeflow Pipelines SDK](https://kubeflow-pipelines.readthedocs.io/en/1.8.13/)

Based on your preference you can choose between the two options. This notebook will only focus on Kubeflow Pipelines.

If you don't have familiarity in authoring pipelines in Vertex AI Pipelines, we suggest the following resources:
* [Introduction to Vertex AI Pipelines](https://cloud.google.com/vertex-ai/docs/pipelines/introduction)
* [Build a Pipeline in Vertex AI Pipelines](https://cloud.google.com/vertex-ai/docs/pipelines/build-pipeline)

### Define Custom Components for your pipeline

We will use a mix of prebuilt (Google Cloud Pipeline Components) and custom components in this notebook. The difference is:

* Prebuilt components are official [Google Cloud Pipeline Components](https://cloud.google.com/vertex-ai/docs/pipelines/components-introduction)(GCPC). The GCPC Library provides a set of prebuilt components that are production quality, consistent, performant, and easy to use in Vertex AI Pipelines.
* As you will build in the cell below, a data scientist or ML engineer typically authored the custom component. This means you have more control over the component (container) code. In this case, it's a Python-function-based component. You also have the option to build a component yourself by packaging code into a container.

In the following two cells, you will build two custom components:

    *Feature Store component.

    *Evaluation component.

#### Define feature store component

Notice that the component assumes that containes the entities-timestamps "query" is already created.

# Feature Store
Next you will build a custom component using the [KFP SDK](https://kubeflow-pipelines.readthedocs.io/en/1.8.13/). Here you will take a Python function and create a component out of it. This component will take features from the Vertex AI Feature Store and output them on Google Cloud Storage (GCS). 

In [None]:
@component(
    output_component_file=INGEST_FEATURE_STORE,
    base_image=BASE_IMAGE,
    packages_to_install=["google-cloud-aiplatform==1.21.0"],
)
def ingest_features_gcs(
    project_id: str,
    region: str,
    bucket_name: str,
    feature_store_id: str,
    read_instances_uri: str,
) -> NamedTuple("Outputs", [("snapshot_uri_paths", str),],):
    # Libraries --------------------------------------------------------------------------------------------------------------------------
    from datetime import datetime
    import glob
    import urllib
    import json

    # Feature Store
    from google.cloud.aiplatform import Featurestore, EntityType, Feature

    # Variables --------------------------------------------------------------------------------------------------------------------------
    timestamp = datetime.now().strftime("%Y%m%d%H%M%S")
    api_endpoint = region + "-aiplatform.googleapis.com"
    bucket = urllib.parse.urlsplit(bucket_name).netloc
    export_uri = (
        f"{bucket_name}/data/snapshots/{timestamp}"  # format as new gsfuse requires
    )
    export_uri_path = f"/gcs/{bucket}/data/snapshots/{timestamp}"
    customer_entity = "customer"
    terminal_entity = "terminal"
    serving_feature_ids = {customer_entity: ["*"], terminal_entity: ["*"]}

    print(timestamp)
    print(bucket)
    print(export_uri)
    print(export_uri_path)
    print(customer_entity)
    print(terminal_entity)
    print(serving_feature_ids)

    # Main -------------------------------------------------------------------------------------------------------------------------------

    ## Define the feature store resource path
    feature_store_resource_path = (
        f"projects/{project_id}/locations/{region}/featurestores/{feature_store_id}"
    )
    print("Feature Store: \t", feature_store_resource_path)

    ## Run batch job request
    try:
        ff_feature_store = Featurestore(feature_store_resource_path)
        ff_feature_store.batch_serve_to_gcs(
            gcs_destination_output_uri_prefix=export_uri,
            gcs_destination_type="csv",
            serving_feature_ids=serving_feature_ids,
            read_instances_uri=read_instances_uri,
            pass_through_fields=["tx_fraud", "tx_amount"],
        )
    except Exception as error:
        print(error)

    # Store metadata
    snapshot_pattern = f"{export_uri_path}/*.csv"
    snapshot_files = glob.glob(snapshot_pattern)
    snapshot_files_fmt = [p.replace("/gcs/", "gs://") for p in snapshot_files]
    snapshot_files_string = json.dumps(snapshot_files_fmt)

    component_outputs = NamedTuple(
        "Outputs",
        [
            ("snapshot_uri_paths", str),
        ],
    )

    print(snapshot_pattern)
    print(snapshot_files)
    print(snapshot_files_fmt)
    print(snapshot_files_string)

    return component_outputs(snapshot_files_string)

#### Define an evaluate custom component
Next you will build a custom component that will evaluate our XGBoost model. This component will output `avg_precision_score` so that it can be used downstream for validating the model before deployment. 

In [None]:
@component(output_component_file=EVALUATE)
def evaluate_model(
    model_in: Input[Artifact],
    metrics_uri: str,
    meta_metrics: Output[Metrics],
    graph_metrics: Output[ClassificationMetrics],
    model_out: Output[Artifact],
) -> NamedTuple("Outputs", [("metrics_thr", float),],):
    # Libraries --------------------------------------------------------------------------------------------------------------------------
    import json

    # Variables --------------------------------------------------------------------------------------------------------------------------
    metrics_path = metrics_uri.replace("gs://", "/gcs/")
    labels = ["not fraud", "fraud"]

    # Main -------------------------------------------------------------------------------------------------------------------------------
    with open(metrics_path, mode="r") as json_file:
        metrics = json.load(json_file)

    ## metrics
    fpr = metrics["fpr"]
    tpr = metrics["tpr"]
    thrs = metrics["thrs"]
    c_matrix = metrics["confusion_matrix"]
    avg_precision_score = metrics["avg_precision_score"]
    f1 = metrics["f1_score"]
    lg_loss = metrics["log_loss"]
    prec_score = metrics["precision_score"]
    rec_score = metrics["recall_score"]

    meta_metrics.log_metric("avg_precision_score", avg_precision_score)
    meta_metrics.log_metric("f1_score", f1)
    meta_metrics.log_metric("log_loss", lg_loss)
    meta_metrics.log_metric("precision_score", prec_score)
    meta_metrics.log_metric("recall_score", rec_score)
    graph_metrics.log_roc_curve(fpr, tpr, thrs)
    graph_metrics.log_confusion_matrix(labels, c_matrix)

    ## model metadata
    model_framework = "xgb.dask"
    model_type = "DaskXGBClassifier"
    model_user = "author"
    model_function = "classification"
    model_out.metadata["framework"] = model_framework
    model_out.metadata["type"] = model_type
    model_out.metadata["model function"] = model_function
    model_out.metadata["modified by"] = model_user

    component_outputs = NamedTuple(
        "Outputs",
        [
            ("metrics_thr", float),
        ],
    )

    return component_outputs(float(avg_precision_score))

#### Author your pipeline
Next you will author the pipeline using the KFP SDK. This pipeline consists of the following steps:

* Ingest features
* Create Vertex AI Dataset
* Train XGBoost model
* Evaluate model
* Condition
* Create endpoint
* Deploy model into endpoint

In [None]:
@dsl.pipeline(
    pipeline_root=PIPELINE_ROOT,
    name=PIPELINE_NAME,
)
def pipeline(
    project_id: str = PROJECT_ID,
    region: str = REGION,
    bucket_name: str = f"gs://{BUCKET_NAME}",
    feature_store_id: str = FEATURESTORE_ID,
    read_instances_uri: str = READ_INSTANCES_URI,
    replica_count: int = 1,
    machine_type: str = "n1-standard-4",
    train_split: float = 0.8,
    test_split: float = 0.1,
    val_split: float = 0.1,
    metrics_uri: str = METRICS_URI,
    thold: float = AVG_PR_THRESHOLD,
):
    # Ingest data from featurestore
    ingest_features_op = ingest_features_gcs(
        project_id=project_id,
        region=region,
        bucket_name=bucket_name,
        feature_store_id=feature_store_id,
        read_instances_uri=read_instances_uri,
    )

    # create dataset
    dataset_create_op = vertex_ai_components.TabularDatasetCreateOp(
        project=project_id,
        display_name=DATASET_NAME,
        gcs_source=ingest_features_op.outputs["snapshot_uri_paths"],
    ).after(ingest_features_op)

    # custom training job component - script
    train_model_op = vertex_ai_components.CustomContainerTrainingJobRunOp(
        display_name=JOB_NAME,
        model_display_name=MODEL_NAME,
        container_uri=IMAGE_URI,
        staging_bucket=bucket_name,
        dataset=dataset_create_op.outputs["dataset"],
        base_output_dir=bucket_name,
        args=ARGS,
        replica_count=replica_count,
        machine_type=machine_type,
        training_fraction_split=train_split,
        validation_fraction_split=val_split,
        test_fraction_split=test_split,
        model_serving_container_image_uri=MODEL_SERVING_IMAGE_URI,
        project=project_id,
        location=region,
    ).after(dataset_create_op)

    # evaluate component
    evaluate_model_op = evaluate_model(
        model_in=train_model_op.outputs["model"], metrics_uri=metrics_uri
    ).after(train_model_op)

    # if threshold on avg_precision_score
    with Condition(
        evaluate_model_op.outputs["metrics_thr"] > thold, name=AVG_PR_CONDITION
    ):
        # create endpoint
        create_endpoint_op = vertex_ai_components.EndpointCreateOp(
            display_name=ENDPOINT_NAME, project=project_id
        ).after(evaluate_model_op)

        # deploy the model
        custom_model_deploy_op = vertex_ai_components.ModelDeployOp(
            model=train_model_op.outputs["model"],
            endpoint=create_endpoint_op.outputs["endpoint"],
            deployed_model_display_name=MODEL_NAME,
            dedicated_resources_machine_type=machine_type,
            dedicated_resources_min_replica_count=replica_count,
        ).after(create_endpoint_op)

#### Compile and run the pipeline
After authoring the pipeline you can use the compiler to compile the pipeline. 

In [None]:
# compile the pipeline
pipeline_compiler = compiler.Compiler()
pipeline_compiler.compile(pipeline_func=pipeline, package_path=PIPELINE_PACKAGE_PATH)

Next you can use the Vertex AI SDK to create a job on Vertex AI Pipelines. 

In [None]:
# instantiate pipeline representation
pipeline_job = vertex_ai.PipelineJob(
    display_name=PIPELINE_NAME,
    template_path=PIPELINE_PACKAGE_PATH,
    pipeline_root=PIPELINE_ROOT,
    enable_caching=False,
)

In [None]:
# submit the pipeline run (may take ~20 minutes for the first run)
pipeline_job.run(sync=True)

### END

Now you can go to the next notebook `07_deployment.ipynb` and explore deployment using Cloud Build