bqml/05_model_training_pipeline_formalization.ipynb (608 lines of code) (raw):
{
"cells": [
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"id": "zYb6C1p_Qnla"
},
"outputs": [],
"source": [
"# Copyright 2023 Google LLC\n",
"#\n",
"# Licensed under the Apache License, Version 2.0 (the \"License\");\n",
"# you may not use this file except in compliance with the License.\n",
"# You may obtain a copy of the License at\n",
"#\n",
"# https://www.apache.org/licenses/LICENSE-2.0\n",
"#\n",
"# Unless required by applicable law or agreed to in writing, software\n",
"# distributed under the License is distributed on an \"AS IS\" BASIS,\n",
"# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\n",
"# See the License for the specific language governing permissions and\n",
"# limitations under the License."
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "50a5a5935df7"
},
"source": [
"# FraudFinder - BigQuery ML - Model training pipeline formalization\n",
"\n",
"<table align=\"left\">\n",
" <td>\n",
" <a href=\"https://console.cloud.google.com/ai-platform/notebooks/deploy-notebook?download_url=https://github.com/GoogleCloudPlatform/fraudfinder/raw/main/bqml/05_model_training_pipeline_formalization.ipynb\">\n",
" <img src=\"https://www.gstatic.com/cloud/images/navigation/vertex-ai.svg\" alt=\"Google Cloud Notebooks\">Open in Cloud Notebook\n",
" </a>\n",
" </td> \n",
" <td>\n",
" <a href=\"https://colab.research.google.com/github/GoogleCloudPlatform/fraudfinder/blob/main/bqml/05_model_training_pipeline_formalization.ipynb\">\n",
" <img src=\"https://cloud.google.com/ml-engine/images/colab-logo-32px.png\" alt=\"Colab logo\"> Open in Colab\n",
" </a>\n",
" </td>\n",
" <td>\n",
" <a href=\"https://github.com/GoogleCloudPlatform/fraudfinder/blob/main/bqml/05_model_training_pipeline_formalization.ipynb\">\n",
" <img src=\"https://cloud.google.com/ml-engine/images/github-logo-32px.png\" alt=\"GitHub logo\">\n",
" View on GitHub\n",
" </a>\n",
" </td>\n",
"</table>"
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "e42cde8f0c0b"
},
"source": [
"## Overview\n",
"\n",
"[FraudFinder](https://github.com/googlecloudplatform/fraudfinder) is a series of labs on how to build a real-time fraud detection system on Google Cloud. Throughout the Fraudfinder labs, you will learn how to read historical bank transaction data stored in data warehouse, read from a live stream of new transactions, perform exploratory data analysis (EDA), do feature engineering, ingest features into a feature store, train a model using feature store, register your model in a model registry, evaluate your model, deploy your model to an endpoint, do real-time inference on your model with feature store, and monitor your model.\n",
"\n",
"\n",
"### Objective\n",
"\n",
"In this tutorial, you will learn how to:\n",
"\n",
"- Train a Logistic Regression model using BigQuery ML (BQML)\n",
"- Evaluate the model BQML model\n",
"- Run an evaluation job \n",
"- Register the model on Vertex AI Model Registry\n",
"- Create a Vertex AI Endpoint and upload the BQML to the Endpoint \n",
"\n",
"This tutorial uses the following Google Cloud services:\n",
"- [BigQuery](https://cloud.google.com/bigquery/)\n",
"- [BigQueryML](https://cloud.google.com/bigquery-ml/)\n",
"- [Vertex AI](https://cloud.google.com/vertex-ai/)\n",
"\n",
"The steps performed include:\n",
"- Build a custom component for the Pipeline. \n",
"- Using Kubeflow Pipeline (KFP) DSL to build an end-to-end pipeline\n",
"- Compile the Pipeline\n",
"- Submit and execute the pipeline\n",
"\n",
"### Costs \n",
"This tutorial uses billable components of Google Cloud:\n",
"* BigQuery\n",
"* BigQuery ML\n",
"* Vertex AI\n",
"* Google Cloud Storage\n",
"\n",
"Learn about [BigQuery Pricing](https://cloud.google.com/bigquery/pricing), [BigQuery ML pricing](https://cloud.google.com/bigquery-ml/pricing), [Vertex AI pricing](https://cloud.google.com/vertex-ai/pricing), and use the [Pricing Calculator](https://cloud.google.com/products/calculator/)"
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "64df07cdafe3"
},
"source": [
"### Load configuration settings from the setup notebook\n",
"\n",
"First you will need to set the constants used in this notebook and load the config settings from the `00_environment_setup.ipynb notebook`."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"id": "eaa773408baa"
},
"outputs": [],
"source": [
"GCP_PROJECTS = !gcloud config get-value project\n",
"PROJECT_ID = GCP_PROJECTS[0]\n",
"BUCKET_NAME = f\"{PROJECT_ID}-fraudfinder\"\n",
"config = !gsutil cat gs://{BUCKET_NAME}/config/notebook_env.py\n",
"print(config.n)\n",
"exec(config.n)"
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "e9953481a2d9"
},
"source": [
"### Import libraries"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"id": "95bff07e93d7"
},
"outputs": [],
"source": [
"import json\n",
"from typing import NamedTuple, Optional\n",
"from datetime import datetime, timedelta\n",
"\n",
"import google.cloud.aiplatform as vertex_ai\n",
"# kfp and cloud components\n",
"import kfp\n",
"from google.cloud import bigquery\n",
"from google_cloud_pipeline_components.types import artifact_types\n",
"from google_cloud_pipeline_components.v1.bigquery import (\n",
" BigqueryCreateModelJobOp, BigqueryEvaluateModelJobOp,\n",
" BigqueryExplainPredictModelJobOp, BigqueryExportModelJobOp,\n",
" BigqueryPredictModelJobOp, BigqueryQueryJobOp)\n",
"from google_cloud_pipeline_components.v1.endpoint import (EndpointCreateOp,\n",
" ModelDeployOp)\n",
"from kfp import dsl\n",
"from kfp.v2 import compiler\n",
"from kfp.v2.dsl import HTML, Artifact, Condition, Input, Output, component"
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "98ae5e4872e0"
},
"source": [
"### Define constants"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"id": "3395ec7bd2bf"
},
"outputs": [],
"source": [
"# General\n",
"START_DATE_TRAIN = (datetime.today() - timedelta(days=1)).strftime(\"%Y-%m-%d\")\n",
"END_DATE_TRAIN = (datetime.today() - timedelta(days=1)).strftime(\"%Y-%m-%d\")\n",
"SERVING_FEATURE_IDS = {\"customer\": [\"*\"], \"terminal\": [\"*\"]}\n",
"READ_INSTANCES_TABLE = f\"ground_truth_{END_DATE_TRAIN}\"\n",
"READ_INSTANCES_URI = f\"bq://{PROJECT_ID}.tx.{READ_INSTANCES_TABLE}\"\n",
"BQ_TABLE_NAME = f\"train_table_{END_DATE_TRAIN.replace('-', '')}\"\n",
"TRAIN_TABLE_URI = f\"bq://{PROJECT_ID}.tx.{BQ_TABLE_NAME}\"\n",
"PIPELINE_ROOT = f\"gs://{BUCKET_NAME}/pipeline_root/ff\"\n",
"PIPELINE_DISPLAY_NAME = \"bqml-pipeline-ff\"\n",
"MODEL_NAME_PIPELINE = f\"{MODEL_NAME}_pipeline\"\n",
"PACKAGE_PATH = \"bqml-pipeline-ff.json\""
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "670b5423ac1c"
},
"source": [
"### Initialize Vertex AI SDK\n",
"Initialize the Vertex AI SDK for Python for your project and corresponding bucket."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"id": "a6db9519e754"
},
"outputs": [],
"source": [
"vertex_ai.init(project=PROJECT_ID, staging_bucket=BUCKET_NAME)"
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "y5rtQ2sgQnlh"
},
"source": [
"### Set project folder\n",
"Set the path where we will store the Kubeflow Pipelines Component. "
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"id": "058ed9bd5b7b"
},
"outputs": [],
"source": [
"KFP_COMPONENTS_PATH = \"components\"\n",
"! mkdir -m 777 -p {KFP_COMPONENTS_PATH}\n",
"! mkdir -m 777 -p {KFP_COMPONENTS_PATH}/custom_components"
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "3445d5c03ec8"
},
"source": [
"### Create Custom Components\n",
"In this notebook we will be using a mix of pre-built BigQuery ML components and custom components. The difference is:\n",
"\n",
"* Pre-built components are official Google Cloud Pipeline Components ([GCPC](https://cloud.google.com/vertex-ai/docs/pipelines/gcpc-list)). The Google Cloud Pipeline Components (GCPC) SDK provides a set of prebuilt components that are production quality, consistent, performant, and easy to use in Vertex AI Pipelines. \n",
"* The custom component, as you will build in the cell below, is typically a component authored by a data scientist or ML engineer. This means that you have more control over the code running in the component (container). In this case it's a [Python-function-based component](https://www.kubeflow.org/docs/components/pipelines/v1/sdk/python-function-components/). \n",
"\n",
"In the next two cells you will build two custom components:\n",
"* The first component will take the evaluation metrics of your model and return it so that we can store it as metadata. \n",
"* The second component will deploy our model from the Vertex AI Model Registry into a Vertex AI Endpoint. \n",
"\n",
"The pre-built components provides the benefit of being easy to use, while custom components provide more flexibility beyond the capabilities of pre-built components. "
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "11c2073bf209"
},
"source": [
"#### Build a custom component that will fetch the eval metric \n",
"This custom component will retrieve the evaluation metric and it will be used downstream and stored as metadata. This way we can keep track of our model performance. In order to take a Python function and turn it into a component we will use the `@component` decorator. "
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"id": "41ec3d21cfd1"
},
"outputs": [],
"source": [
"@component(\n",
" base_image=\"python:3.8-slim\",\n",
" packages_to_install=[\"jinja2\", \"pandas\", \"matplotlib\"],\n",
" output_component_file=f\"{KFP_COMPONENTS_PATH}/custom_components/build_bq_evaluate_metrics.yaml\",\n",
")\n",
"def get_model_evaluation_metrics(\n",
" metrics_in: Input[Artifact],\n",
") -> NamedTuple(\"Outputs\", [(\"accuracy\", float)]):\n",
" \"\"\"\n",
" Get the accuracy from the metrics\n",
" Args:\n",
" metrics_in: metrics artifact\n",
" Returns:\n",
" accuracy: accuracy\n",
" \"\"\"\n",
"\n",
" import pandas as pd\n",
"\n",
" def get_column_names(header):\n",
" \"\"\"\n",
" Helper function to get the column names from the metrics table.\n",
" Args:\n",
" header: header\n",
" Returns:\n",
" column_names: column names\n",
" \"\"\"\n",
" header_clean = header.replace(\"_\", \" \")\n",
" header_abbrev = \"\".join([h[0].upper() for h in header_clean.split()])\n",
" header_prettied = f\"{header_clean} ({header_abbrev})\"\n",
" return header_prettied\n",
"\n",
" # Extract rows and schema from metrics artifact\n",
" rows = metrics_in.metadata[\"rows\"]\n",
" schema = metrics_in.metadata[\"schema\"]\n",
"\n",
" # Convert into a tabular format\n",
" columns = [metrics[\"name\"] for metrics in schema[\"fields\"] if \"name\" in metrics]\n",
" records = []\n",
" for row in rows:\n",
" records.append([dl[\"v\"] for dl in row[\"f\"]])\n",
"\n",
" metrics = pd.DataFrame.from_records(records, columns=columns).astype(float).round(3)\n",
"\n",
" metrics = metrics.reset_index()\n",
"\n",
" # Create the HTML artifact used for the metrics\n",
" pretty_columns = list(\n",
" map(\n",
" lambda h: get_column_names(h)\n",
" if h != columns[0]\n",
" else h.replace(\"_\", \" \").capitalize(),\n",
" columns,\n",
" )\n",
" )\n",
"\n",
" # Create metrics dictionary for the model\n",
" accuracy = round(float(metrics.accuracy), 3)\n",
" component_outputs = NamedTuple(\"Outputs\", [(\"accuracy\", float)])\n",
"\n",
" return component_outputs(accuracy)"
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "6f7fbb50013d"
},
"source": [
"#### Custom component that will deploy our model from the Vertex AI Model Registry into a Vertex AI Endpoint. \n",
"This custom component will take our BQML model that is registered on Vertex AI Model Registry and deploy it into our Vertex AI Endpoint. This will be the last step in our end-to-end pipeline. "
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"id": "6ef38ce8c6e1"
},
"outputs": [],
"source": [
"@component(\n",
" base_image=\"python:3.8-slim\",\n",
" packages_to_install=[\"google-cloud-aiplatform\"],\n",
")\n",
"def upload_model_enpoint(\n",
" project: str,\n",
" location: str,\n",
" bq_model_name: str,\n",
"):\n",
" \"\"\"\n",
" Uploads the model to Vertex AI\n",
" Args:\n",
" project: Project ID\n",
" location: Region\n",
" bq_model_name: A fully-qualified model resource name or model ID.\n",
" Example: \"projects/123/locations/us-central1/models/456\" or\n",
" \"456\" when project and location are initialized or passed.\n",
" Returns:\n",
" None\n",
" \"\"\"\n",
" from google.cloud import aiplatform as vertex_ai\n",
"\n",
" model = vertex_ai.Model(model_name='bqml_fraud_classifier_pipeline')\n",
"\n",
" endpoint = vertex_ai.Endpoint.list(order_by=\"update_time\")\n",
" endpoint = endpoint[-1]\n",
"\n",
" model.deploy(\n",
" endpoint=endpoint,\n",
" min_replica_count=1,\n",
" max_replica_count=1,\n",
" )\n",
"\n",
" model.wait()\n",
"\n",
" return"
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "c7871e7b8855"
},
"source": [
"### Build and orchestrate a pipeline\n",
"\n",
"Next we will build a pipeline that will execute and orchestrate the following steps. Building ML Pipelines that run on Vertex AI pipelines can be done in two different ways:\n",
"\n",
"* [Tensorflow Extended DSL](https://www.tensorflow.org/tfx/tutorials#getting-started-tutorials)\n",
"* [Kubeflow Pipelines DSL](https://www.kubeflow.org/docs/components/pipelines/v1/introduction/)\n",
"\n",
"Based on your preference you can choose between the two options, but this notebook will only focus on Kubeflow Pipelines. \n",
"\n",
"Below you can set the model accuracy threshold used for the condition. "
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"id": "e9ff3ea1b4ef"
},
"outputs": [],
"source": [
"perf_threshold = 0.95"
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "536aabf22af3"
},
"source": [
"In the next cell below, you will put together all the components into a pipeline function. In this example you will use the KFP DSL to define your end-to-end pipeline. For this you will use the `@ds.pipeline` decorator.\n",
"\n",
"In this example there is also a `with Condition` step that will only execute if the threshold is met (i.e. if the model evaluation metric is at or above the threshold `perf_threshold`). "
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"id": "9315e32d159f"
},
"outputs": [],
"source": [
"@dsl.pipeline(\n",
" name=\"bqml-pipeline-ff\",\n",
" description=\"Trains and deploys bqml model to detect fraud\",\n",
" pipeline_root=PIPELINE_ROOT,\n",
")\n",
"def bqml_pipeline_ff(\n",
" bq_table: str = BQ_TABLE_NAME,\n",
" dataset: str = \"tx\",\n",
" model: str = MODEL_NAME_PIPELINE,\n",
" project: str = PROJECT_ID,\n",
" region: str = REGION,\n",
" endpoint_name: str = ENDPOINT_NAME,\n",
"):\n",
"\n",
" bq_model_op = BigqueryCreateModelJobOp(\n",
" project=project,\n",
" location=region,\n",
" query=f\"\"\"CREATE OR REPLACE MODEL `tx.{MODEL_NAME_PIPELINE}` \n",
" OPTIONS (\n",
" MODEL_TYPE='LOGISTIC_REG', \n",
" INPUT_LABEL_COLS=['tx_fraud'], \n",
" EARLY_STOP=TRUE, \n",
" model_registry='vertex_ai',\n",
" vertex_ai_model_id='bqml_fraud_classifier_pipeline',\n",
" vertex_ai_model_version_aliases=['logit', 'experimental']\n",
" ) \n",
" AS SELECT * EXCEPT(timestamp, entity_type_customer, entity_type_terminal) FROM `tx.{BQ_TABLE_NAME}`\"\"\",\n",
" )\n",
"\n",
" _ = BigqueryExplainPredictModelJobOp(\n",
" project=project,\n",
" location=region,\n",
" table_name=f\"{dataset}.{bq_table}\",\n",
" model=bq_model_op.outputs[\"model\"],\n",
" )\n",
"\n",
" bq_evaluate_model_op = BigqueryEvaluateModelJobOp(\n",
" project=project, location=region, model=bq_model_op.outputs[\"model\"]\n",
" ).after(bq_model_op)\n",
"\n",
" get_evaluation_model_metrics_op = (\n",
" get_model_evaluation_metrics(bq_evaluate_model_op.outputs[\"evaluation_metrics\"])\n",
" .after(bq_evaluate_model_op)\n",
" .set_display_name(\"plot evaluation metrics\")\n",
" )\n",
"\n",
" # Check the model performance.\n",
" with Condition(\n",
" get_evaluation_model_metrics_op.outputs[\"accuracy\"] > perf_threshold,\n",
" name=\"accuracy is above threshold\",\n",
" ):\n",
"\n",
" endpoint_create_op = EndpointCreateOp(\n",
" project=project, location=region, display_name=ENDPOINT_NAME\n",
" )\n",
"\n",
" _ = upload_model_enpoint(\n",
" project=project, location=region, bq_model_name=model\n",
" ).after(endpoint_create_op)"
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "747926eede65"
},
"source": [
"### Submit Vertex AI Pipeline Job\n",
"Once you have authored your pipeline, to deploy it, you will first need to compile it into a JSON file, `bqml-pipeline-ff.json`, then upload the JSON file to Vertex AI Pipelines in order to submit it for execution. \n",
"\n",
"The first step is using the `compiler.Compiler()` to compile the pipeline `bqml_pipeline_ff()` into a JSON file. "
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"id": "8861c343d8d6"
},
"outputs": [],
"source": [
"from kfp.v2 import compiler\n",
"\n",
"compiler.Compiler().compile(pipeline_func=bqml_pipeline_ff, package_path=PACKAGE_PATH)"
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "d49e56061b30"
},
"source": [
"Now you can go ahead and submit a Vertex AI Pipeline job, using `vertex_ai.PipelineJob()`. The output of the next cell will give you a URL that will take you the Vertex AI Pipeline UI. There, you can monitor the progress of your pipeline run as it executes over the next several minutes. \n",
"\n",
"The execution of the pipeline will take some time, and your pipeline execution is completed once all of the steps are green. "
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"id": "dc70ea573a35"
},
"outputs": [],
"source": [
"job = vertex_ai.PipelineJob(\n",
" display_name=PIPELINE_DISPLAY_NAME,\n",
" template_path=PACKAGE_PATH,\n",
" pipeline_root=PIPELINE_ROOT,\n",
" enable_caching=True,\n",
")\n",
"\n",
"print(job.run())"
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "6fIbNgDIQnlj"
},
"source": [
"Below you can see the Vertex AI Pipeline execution you will visualize in the Cloud console.\n",
"\n",
"<img src=\"https://github.com/GoogleCloudPlatform/fraudfinder/blob/main/misc/images/pipeline_run_example.png?raw=1\">"
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "_twzPbfLQnlj"
},
"source": [
"### END\n",
"\n",
"Now you can go to the next notebook `06_model_monitoring.ipynb`"
]
}
],
"metadata": {
"colab": {
"name": "05_model_training_pipeline_formalization.ipynb",
"provenance": [],
"toc_visible": true
},
"environment": {
"kernel": "python3",
"name": "common-cpu.m104",
"type": "gcloud",
"uri": "gcr.io/deeplearning-platform-release/base-cpu:m104"
},
"kernelspec": {
"display_name": "Python 3",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.7.12"
}
},
"nbformat": 4,
"nbformat_minor": 4
}