vertex_ai/05_model_training_xgboost_formalization.ipynb (1,097 lines of code) (raw):
{
"cells": [
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"id": "ur8xi4C7S06n"
},
"outputs": [],
"source": [
"# Copyright 2023 Google LLC\n",
"#\n",
"# Licensed under the Apache License, Version 2.0 (the \"License\");\n",
"# you may not use this file except in compliance with the License.\n",
"# You may obtain a copy of the License at\n",
"#\n",
"# https://www.apache.org/licenses/LICENSE-2.0\n",
"#\n",
"# Unless required by applicable law or agreed to in writing, software\n",
"# distributed under the License is distributed on an \"AS IS\" BASIS,\n",
"# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\n",
"# See the License for the specific language governing permissions and\n",
"# limitations under the License."
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# Fraudfinder - Model training and deployment using Vertex AI\n",
"\n",
"<table align=\"left\">\n",
" <td>\n",
" <a href=\"https://console.cloud.google.com/ai-platform/notebooks/deploy-notebook?download_url=https://github.com/GoogleCloudPlatform/fraudfinder/blob/main/vertex_ai/05_model_training_xgboost_formalization.ipynb\">\n",
" <img src=\"https://www.gstatic.com/cloud/images/navigation/vertex-ai.svg\" alt=\"Google Cloud Notebooks\">Open in Cloud Notebook\n",
" </a>\n",
" </td> \n",
" <td>\n",
" <a href=\"https://colab.research.google.com/github/GoogleCloudPlatform/fraudfinder/blob/main/vertex_ai/05_model_training_xgboost_formalization.ipynb\">\n",
" <img src=\"https://cloud.google.com/ml-engine/images/colab-logo-32px.png\" alt=\"Colab logo\"> Open in Colab\n",
" </a>\n",
" </td>\n",
" <td>\n",
" <a href=\"https://github.com/GoogleCloudPlatform/fraudfinder/blob/main/vertex_ai/05_model_training_xgboost_formalization.ipynb\">\n",
" <img src=\"https://cloud.google.com/ml-engine/images/github-logo-32px.png\" alt=\"GitHub logo\">\n",
" View on GitHub\n",
" </a>\n",
" </td>\n",
"</table>"
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "tvgnzT1CKxrO"
},
"source": [
"## Overview\n",
"\n",
"[Fraudfinder](https://github.com/googlecloudplatform/fraudfinder) is a series of labs on how to build a real-time fraud detection system on Google Cloud. Throughout the Fraudfinder labs, you will learn how to read historical bank transaction data stored in data warehouse, read from a live stream of new transactions, perform exploratory data analysis (EDA), do feature engineering, ingest features into a feature store, train a model using feature store, register your model in a model registry, evaluate your model, deploy your model to an endpoint, do real-time inference on your model with feature store, and monitor your model.\n",
"\n",
"### Objective\n",
"\n",
"In the following notebook, you will learn how to:\n",
"\n",
"* Build a Vertex AI dataset\n",
"* Build a Docker container and train a custom XGBoost model using Vertex AI\n",
"* Evaluate the model locally\n",
"* Deploy the model to Vertex AI as an endpoint. \n",
"\n",
"This tutorial uses the following Google Cloud data analytics and services:\n",
"\n",
"- [BigQuery](https://cloud.google.com/bigquery/)\n",
"- [Vertex AI](https://cloud.google.com/vertex-ai/)\n",
"\n",
"### Costs \n",
"\n",
"This tutorial uses billable components of Google Cloud:\n",
"\n",
"* BigQuery\n",
"* Vertex AI\n",
"\n",
"Learn about [BigQuery Pricing](https://cloud.google.com/bigquery/pricing), [Vertex AI pricing](https://cloud.google.com/vertex-ai/pricing), and use the [Pricing Calculator](https://cloud.google.com/products/calculator/) to generate a cost estimate based on your projected usage."
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Load configuration settings from the setup notebook\n",
"\n",
"Set the constants used in this notebook and load the config settings from the `00_environment_setup.ipynb` notebook."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"GCP_PROJECTS = !gcloud config get-value project\n",
"PROJECT_ID = GCP_PROJECTS[0]\n",
"BUCKET_NAME = f\"{PROJECT_ID}-fraudfinder\"\n",
"config = !gsutil cat gs://{BUCKET_NAME}/config/notebook_env.py\n",
"print(config.n)\n",
"exec(config.n)"
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "XoEqT2Y4DJmf"
},
"source": [
"### Import libraries"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"id": "pRUOFELefqf1"
},
"outputs": [],
"source": [
"# General\n",
"import os\n",
"import sys\n",
"from typing import Union, List\n",
"import json\n",
"from datetime import datetime, timedelta\n",
"\n",
"# Data Preprocessing\n",
"import numpy as np\n",
"import pandas as pd\n",
"\n",
"# Model Training with Vertex AI\n",
"from google.cloud import bigquery\n",
"from google.cloud import aiplatform as vertex_ai\n",
"from google.cloud.aiplatform_v1 import ModelServiceClient\n",
"from google.cloud.aiplatform_v1.types import ListModelEvaluationsRequest\n",
"from google.protobuf.json_format import MessageToDict\n",
"from google.cloud.aiplatform import gapic as aip\n",
"from google.cloud import storage\n",
"\n",
"# Model Deployment and Evaluation\n",
"from sklearn.metrics import precision_recall_fscore_support\n",
"import xgboost as xgb\n",
"\n",
"\n",
"# Feature Store\n",
"from google.cloud import aiplatform as vertex_ai\n",
"from google.cloud.aiplatform import Featurestore, EntityType, Feature"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Define constants"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# General\n",
"DATA_DIR = os.path.join(os.pardir, \"data\")\n",
"TRAIN_DATA_DIR = os.path.join(DATA_DIR, \"train\")\n",
"DATA_URI = f\"gs://{BUCKET_NAME}/data\"\n",
"TRAIN_DATA_URI = f\"{DATA_URI}/train\"\n",
"\n",
"# Feature Store\n",
"START_DATE_TRAIN = (datetime.today() - timedelta(days=1)).strftime(\"%Y-%m-%d\")\n",
"CUSTOMER_ENTITY = \"customer\"\n",
"TERMINAL_ENTITY = \"terminal\"\n",
"SERVING_FEATURE_IDS = {CUSTOMER_ENTITY: [\"*\"], TERMINAL_ENTITY: [\"*\"]}\n",
"READ_INSTANCES_TABLE = f\"ground_truth_{ID}\"\n",
"READ_INSTANCES_URI = f\"bq://{PROJECT_ID}.tx.{READ_INSTANCES_TABLE}\"\n",
"\n",
"# Training\n",
"EXPERIMENT_NAME = f\"fraudfinder-xgb-experiment-{ID}\"\n",
"TARGET = \"tx_fraud\"\n",
"\n",
"## Custom Training\n",
"DATASET_NAME = f\"sample_train-{ID}\"\n",
"TRAIN_JOB_NAME = f\"fraudfinder_xgb_train_frmlz-{ID}\"\n",
"MODEL_NAME = f\"{MODEL_NAME}_xgb_frmlz_{ID}\"\n",
"ENDPOINT_NAME = f\"{ENDPOINT_NAME}_xgb_frmlz_{ID}\"\n",
"MODEL_SERVING_IMAGE_URI = (\n",
" \"us-docker.pkg.dev/vertex-ai/prediction/xgboost-cpu.1-1:latest\"\n",
")\n",
"IMAGE_REPOSITORY = f\"fraudfinder-{ID}\"\n",
"IMAGE_NAME = \"dask-xgb-classificator\"\n",
"IMAGE_TAG = \"v1\"\n",
"IMAGE_URI = f\"us-central1-docker.pkg.dev/{PROJECT_ID}/{IMAGE_REPOSITORY}/{IMAGE_NAME}:{IMAGE_TAG}\"\n",
"TRAIN_COMPUTE = \"e2-standard-4\"\n",
"DEPLOY_COMPUTE = \"n1-standard-4\""
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "XoEqT2Y4DJmf"
},
"source": [
"### Initialize Vertex AI SDK and BigQuery Client for Python"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"bq_client = bigquery.Client(project=PROJECT_ID, location=REGION)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"vertex_ai.init(\n",
" project=PROJECT_ID,\n",
" location=REGION,\n",
" staging_bucket=BUCKET_NAME,\n",
" experiment=EXPERIMENT_NAME,\n",
")"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Helper Functions\n",
"You will now run some helper functions that we will use throughout the notebook."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"def get_evaluation_metrics(client, model_resource_name):\n",
" model_evalution_request = ListModelEvaluationsRequest(parent=model_resource_name)\n",
" model_evaluation_list = client.list_model_evaluations(\n",
" request=model_evalution_request\n",
" )\n",
" metrics_strlist = []\n",
" for evaluation in model_evaluation_list:\n",
" metrics = MessageToDict(evaluation._pb.metrics)\n",
" return metrics\n",
"\n",
"\n",
"def gcs_list(gcs_uri):\n",
" obj_list = []\n",
" storage_client = storage.Client()\n",
" bucket, key = gcs_uri.replace(\"gs://\", \"\").split(\"/\", 1)\n",
" for blob in storage_client.list_blobs(bucket, prefix=key):\n",
" obj_list.append(\"gs://\" + bucket + \"/\" + str(blob.name))\n",
" return obj_list"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"We're also using the BigQuery helper function. "
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# Wrapper to use BigQuery client to run query/job, return job ID or result as DF\n",
"def run_bq_query(sql: str) -> Union[str, pd.DataFrame]:\n",
" \"\"\"\n",
" Run a BigQuery query and return the job ID or result as a DataFrame\n",
" Args:\n",
" sql: SQL query, as a string, to execute in BigQuery\n",
" Returns:\n",
" df: DataFrame of results from query, or error, if any\n",
" \"\"\"\n",
"\n",
" bq_client = bigquery.Client()\n",
"\n",
" # Try dry run before executing query to catch any errors\n",
" job_config = bigquery.QueryJobConfig(dry_run=True, use_query_cache=False)\n",
" bq_client.query(sql, job_config=job_config)\n",
"\n",
" # If dry run succeeds without errors, proceed to run query\n",
" job_config = bigquery.QueryJobConfig()\n",
" client_result = bq_client.query(sql, job_config=job_config)\n",
"\n",
" job_id = client_result.job_id\n",
"\n",
" # Wait for query/job to finish running. then get & return data frame\n",
" df = client_result.result().to_arrow().to_pandas()\n",
" print(f\"Finished job_id: {job_id}\")\n",
" return df"
]
},
{
"cell_type": "markdown",
"metadata": {
"tags": []
},
"source": [
"## Fetching feature values for model training\n",
"\n",
"To fetch training data, we have to specify the following inputs to batch serving:\n",
"\n",
"- a file containing a \"query\", with the entities and timestamps for each label\n",
"- a list of feature values to fetch\n",
"- the destination location and format\n"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Read-instance list\n",
"\n",
"In our case, we need a csv file with content formatted like the table below:\n",
"\n",
"|customer |terminal|timestamp |\n",
"|-----------------------------|--------|---------------------------------------------|\n",
"|xxx3859 |xxx8811 |2021-07-07 00:01:10 UTC |\n",
"|xxx4165 |xxx8810 |2021-07-07 00:01:55 UTC |\n",
"|xxx2289 |xxx2081 |2021-07-07 00:02:12 UTC |\n",
"|xxx3227 |xxx3011 |2021-07-07 00:03:23 UTC |\n",
"|xxx2819 |xxx6263 |2021-07-07 00:05:30 UTC |\n",
"\n",
"where the column names are the names of entities in Feature Store and the timestamps represents the time an event occurred."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"sql_query = f\"\"\"\n",
"CREATE OR REPLACE TABLE `{PROJECT_ID}.tx.{READ_INSTANCES_TABLE}` as (\n",
" SELECT\n",
" raw_tx.TX_TS AS timestamp,\n",
" raw_tx.CUSTOMER_ID AS customer,\n",
" raw_tx.TERMINAL_ID AS terminal,\n",
" raw_tx.TX_AMOUNT AS tx_amount,\n",
" raw_lb.TX_FRAUD AS tx_fraud,\n",
" FROM \n",
" tx.tx as raw_tx\n",
" LEFT JOIN \n",
" tx.txlabels as raw_lb\n",
" ON raw_tx.TX_ID = raw_lb.TX_ID\n",
" WHERE\n",
" DATE(raw_tx.TX_TS) = \"{START_DATE_TRAIN}\"\n",
" LIMIT 50000\n",
");\n",
"\"\"\"\n",
"\n",
"print(sql_query)\n",
"\n",
"run_bq_query(sql_query)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Get Feature Store ID\n",
"Initiate the feature store you created in the `02_feature_engineering_batch.ipynb` notebook."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"try:\n",
" ff_feature_store = Featurestore(FEATURESTORE_ID)\n",
"except NameError:\n",
" print(f\"\"\"The feature store {FEATURESTORE_ID} does not exist!\"\"\")"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Fetch a sample of data and dump it into a bucket \n",
"In this section, we will use the batch serving of the Vertex AI Feature Store to prepare a dataset for training.\n",
"\n",
"You first have to set `uniformbucketlevelaccess` on the bucket. When you enable uniform bucket-level access on a bucket, Access Control Lists (ACLs) are disabled, and only bucket-level Identity and Access Management (IAM) permissions grant access to that bucket and the objects it contains. This is not the best practice for product workloads. We only use it to prevent issues when running the workshop. Read more about `uniformbucketlevelaccess` in our [documentation](https://cloud.google.com/storage/docs/uniform-bucket-level-access). "
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"!gsutil uniformbucketlevelaccess set on gs://{BUCKET_NAME}"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Next fetch a batch of data from the Vertex AI Feature Store. "
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"ff_feature_store.batch_serve_to_gcs(\n",
" gcs_destination_output_uri_prefix=TRAIN_DATA_URI,\n",
" gcs_destination_type=\"csv\",\n",
" serving_feature_ids=SERVING_FEATURE_IDS,\n",
" read_instances_uri=READ_INSTANCES_URI,\n",
" pass_through_fields=[\"tx_amount\", \"tx_fraud\"],\n",
")\n",
"\n",
"!gsutil uniformbucketlevelaccess set off gs://{BUCKET_NAME}"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Now you will create a copy of the training data in your local notebook instance so that you can use it later for testing the model."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"!gsutil ls $TRAIN_DATA_URI\n",
"!sudo gsutil cp -r $TRAIN_DATA_URI $TRAIN_DATA_DIR"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Exporting the features into cloud storage will generate a csv file. Let's list the local file:"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"!ls $TRAIN_DATA_DIR"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Building a fraud detection model using Vertex AI custom training"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"#### Building a Vertex AI dataset\n",
"In this section, you will create a managed [Vertex AI dataset](https://cloud.google.com/vertex-ai/docs/training/using-managed-datasets). Vertex AI datasets can be used to train AutoML models or custom-trained models. "
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# retrieve list of local files\n",
"flist = !ls $TRAIN_DATA_DIR\n",
"obj_list = [f\"gs://{PROJECT_ID}-fraudfinder/data/train/{fname}\" for fname in flist]\n",
"obj_list"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# retrieve list of local files\n",
"flist = !ls $TRAIN_DATA_DIR\n",
"obj_list = [f\"gs://{PROJECT_ID}-fraudfinder/data/train/{fname}\" for fname in flist]\n",
"\n",
"# create Vertex AI managed dataset\n",
"dataset = vertex_ai.TabularDataset.create(\n",
" display_name=DATASET_NAME,\n",
" gcs_source=obj_list[0],\n",
")\n",
"\n",
"print(\"Dataset:\", f\"{dataset.display_name}\")\n",
"print(\"Name: \\t\", f\"{dataset.resource_name}\")"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Train a custom model\n",
"\n",
"In this section, you will need to train an XGBoost model on Vertex AI custom training. Custom training on Vertex AI requires a container, which contains all of the necessary code, files, and code dependencies needed to train the model.\n",
"\n",
"#### Create the training job with XGBoost and Dask\n",
"\n",
"To perform custom training, you can use either a pre-built container or build your container. In this notebook we will being use XGBoost with the Dask framework, and so we will need to build a custom container for XGBoost and use it to train a model with the Vertex AI custom training service.\n",
"\n",
"You will use Dask. Dask is a parallel computing library built on Python. Dask allows easy management of distributed workers and excels at handling large distributed data science workflows. The implementation in XGBoost originates from dask-xgboost with some extended functionalities and a different interface. \n",
"\n",
"##### Vertex AI and containers\n",
"The first step is to write your training code. Then, you will need to write a Dockerfile and build a container image based on it. The following cell writes our code into `train_xgb.py`, the module for training an XGBClassifier. We will copy this code into our container to run through the Vertex AI training service.\n",
"\n",
"A custom container is a Docker image that you create to run your training application. By running your machine learning (ML) training job in a custom container, you can use ML frameworks, non-ML dependencies, libraries, and binaries that are not otherwise supported on Vertex AI. You can read more in our [documentation](https://cloud.google.com/vertex-ai/docs/training/containers-overview). "
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# create a folder for all container-related files\n",
"!mkdir -p -m 777 build_training"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"%%writefile build_training/train_xgb.py\n",
"\n",
"\"\"\"\n",
"train_xgb.py is the module for training a XGBClassifier pipeline\n",
"\"\"\"\n",
"\n",
"# Libraries --------------------------------------------------------------------------------------------------------------------------\n",
"\n",
"import argparse\n",
"import numpy as np\n",
"import os\n",
"import json\n",
"import logging\n",
"from pathlib import Path\n",
"import dask.dataframe as dask_df\n",
"from dask.distributed import LocalCluster, Client\n",
"import xgboost as xgb\n",
"from sklearn.metrics import (roc_curve, confusion_matrix, average_precision_score, f1_score, \n",
" log_loss, precision_score, recall_score)\n",
"\n",
"# Variables --------------------------------------------------------------------------------------------------------------------------\n",
"\n",
"## Read environmental variables\n",
"def gcs_path_to_local_path(old_path):\n",
" new_path = old_path.replace(\"gs://\", \"/gcs/\")\n",
" return new_path\n",
"\n",
"TRAINING_DATA_PATH = gcs_path_to_local_path(os.environ[\"AIP_TRAINING_DATA_URI\"])\n",
"TEST_DATA_PATH = gcs_path_to_local_path(os.environ[\"AIP_TEST_DATA_URI\"])\n",
"MODEL_DIR = gcs_path_to_local_path(os.environ[\"AIP_MODEL_DIR\"])\n",
"MODEL_PATH = MODEL_DIR + \"model.bst\"\n",
"\n",
"## Training variables\n",
"LABEL_COLUMN = \"tx_fraud\"\n",
"UNUSED_COLUMNS = [\"timestamp\",\"entity_type_customer\",\"entity_type_terminal\"]\n",
"DATA_SCHEMA = {\n",
"\"timestamp\" : \"object\",\n",
"\"tx_amount\": \"float64\",\n",
"\"tx_fraud\": \"Int64\",\n",
"\"entity_type_customer\": \"Int64\",\n",
"\"customer_id_nb_tx_1day_window\": \"Int64\",\n",
"\"customer_id_nb_tx_7day_window\": \"Int64\",\n",
"\"customer_id_nb_tx_14day_window\": \"Int64\",\n",
"\"customer_id_avg_amount_1day_window\": \"float64\",\n",
"\"customer_id_avg_amount_7day_window\": \"float64\",\n",
"\"customer_id_avg_amount_14day_window\": \"float64\",\n",
"\"customer_id_nb_tx_15min_window\": \"Int64\",\n",
"\"customer_id_avg_amount_15min_window\": \"float64\",\n",
"\"customer_id_nb_tx_30min_window\": \"Int64\",\n",
"\"customer_id_avg_amount_30min_window\": \"float64\",\n",
"\"customer_id_nb_tx_60min_window\": \"Int64\",\n",
"\"customer_id_avg_amount_60min_window\": \"float64\",\n",
"\"entity_type_terminal\": \"Int64\",\n",
"\"terminal_id_nb_tx_1day_window\": \"Int64\",\n",
"\"terminal_id_nb_tx_7day_window\": \"Int64\",\n",
"\"terminal_id_nb_tx_14day_window\": \"Int64\",\n",
"\"terminal_id_risk_1day_window\": \"float64\",\n",
"\"terminal_id_risk_7day_window\": \"float64\",\n",
"\"terminal_id_risk_14day_window\": \"float64\",\n",
"\"terminal_id_nb_tx_15min_window\": \"Int64\",\n",
"\"terminal_id_avg_amount_15min_window\": \"float64\",\n",
"\"terminal_id_nb_tx_30min_window\": \"Int64\",\n",
"\"terminal_id_avg_amount_30min_window\": \"float64\",\n",
"\"terminal_id_nb_tx_60min_window\": \"Int64\",\n",
"\"terminal_id_avg_amount_60min_window\": \"float64\"\n",
"}\n",
"\n",
"# Helpers -----------------------------------------------------------------------------------------------------------------------------\n",
"def get_args():\n",
" parser = argparse.ArgumentParser()\n",
"\n",
" # Data files arguments\n",
" parser.add_argument(\"--bucket\", dest=\"bucket\", type=str,\n",
" required=True, help=\"Bucket uri\")\n",
" parser.add_argument(\"--max_depth\", dest=\"max_depth\",\n",
" default=6, type=int,\n",
" help=\"max_depth value.\")\n",
" parser.add_argument(\"--eta\", dest=\"eta\",\n",
" default=0.4, type=float,\n",
" help=\"eta.\")\n",
" parser.add_argument(\"--gamma\", dest=\"gamma\",\n",
" default=0.0, type=float,\n",
" help=\"eta value\")\n",
" \n",
" return parser.parse_args()\n",
"\n",
"def resample(df, replace, frac=1, random_state = 8):\n",
" shuffled_df = df.sample(frac=frac, replace=replace, random_state=random_state)\n",
" return shuffled_df\n",
"\n",
"def preprocess(df):\n",
" df = df.drop(columns=UNUSED_COLUMNS)\n",
"\n",
" # Drop rows with NaN\"s\n",
" df = df.dropna()\n",
"\n",
" # Convert integer valued (numeric) columns to floating point\n",
" numeric_columns = df.select_dtypes([\"float32\", \"float64\"]).columns\n",
" numeric_format = {col:\"float32\" for col in numeric_columns}\n",
" df = df.astype(numeric_format)\n",
"\n",
" return df\n",
"\n",
"def evaluate_model(model, x_true, y_true):\n",
" y_true = y_true.compute()\n",
" \n",
" #calculate metrics\n",
" metrics={}\n",
" \n",
" y_score = model.predict_proba(x_true)[:, 1]\n",
" y_score = y_score.compute()\n",
" fpr, tpr, thr = roc_curve(\n",
" y_true=y_true, y_score=y_score, pos_label=True\n",
" )\n",
" fpr_list = fpr.tolist()[::1000]\n",
" tpr_list = tpr.tolist()[::1000]\n",
" thr_list = thr.tolist()[::1000]\n",
"\n",
" y_pred = model.predict(x_true)\n",
" y_pred.compute()\n",
" c_matrix = confusion_matrix(y_true, y_pred)\n",
" \n",
" avg_precision_score = round(average_precision_score(y_true, y_score), 3)\n",
" f1 = round(f1_score(y_true, y_pred), 3)\n",
" lg_loss = round(log_loss(y_true, y_pred), 3)\n",
" prec_score = round(precision_score(y_true, y_pred), 3)\n",
" rec_score = round(recall_score(y_true, y_pred), 3)\n",
" \n",
" metrics[\"fpr\"] = [round(f, 3) for f in fpr_list]\n",
" metrics[\"tpr\"] = [round(f, 3) for f in tpr_list]\n",
" metrics[\"thrs\"] = [round(f, 3) for f in thr_list]\n",
" metrics[\"confusion_matrix\"] = c_matrix.tolist()\n",
" metrics[\"avg_precision_score\"] = avg_precision_score\n",
" metrics[\"f1_score\"] = f1\n",
" metrics[\"log_loss\"] = lg_loss\n",
" metrics[\"precision_score\"] = prec_score\n",
" metrics[\"recall_score\"] = rec_score\n",
" \n",
" return metrics\n",
"\n",
"\n",
"def main():\n",
" args = get_args()\n",
" \n",
" # variables\n",
" bucket = gcs_path_to_local_path(args.bucket)\n",
" deliverable_uri = (Path(bucket)/\"deliverables\")\n",
" metrics_uri = (deliverable_uri/\"metrics.json\")\n",
"\n",
" # read data\n",
" train_df = dask_df.read_csv(TRAINING_DATA_PATH, dtype=DATA_SCHEMA)\n",
" test_df = dask_df.read_csv(TEST_DATA_PATH, dtype=DATA_SCHEMA)\n",
" \n",
" # preprocessing\n",
" preprocessed_train_df = preprocess(train_df)\n",
" preprocessed_test_df = preprocess(test_df)\n",
" \n",
" # downsampling\n",
" train_nfraud_df = preprocessed_train_df[preprocessed_train_df[LABEL_COLUMN]==0]\n",
" train_fraud_df = preprocessed_train_df[preprocessed_train_df[LABEL_COLUMN]==1]\n",
" train_nfraud_downsample = resample(train_nfraud_df,\n",
" replace=True, \n",
" frac=len(train_fraud_df)/len(train_df))\n",
" ds_preprocessed_train_df = dask_df.multi.concat([train_nfraud_downsample, train_fraud_df])\n",
" \n",
" # target, features split\n",
" x_train = ds_preprocessed_train_df[ds_preprocessed_train_df.columns.difference([LABEL_COLUMN])]\n",
" y_train = ds_preprocessed_train_df.loc[:, LABEL_COLUMN].astype(int)\n",
" x_true = preprocessed_test_df[preprocessed_test_df.columns.difference([LABEL_COLUMN])]\n",
" y_true = preprocessed_test_df.loc[:, LABEL_COLUMN].astype(int)\n",
" \n",
" # train model\n",
" cluster = LocalCluster()\n",
" client = Client(cluster)\n",
" model = xgb.dask.DaskXGBClassifier(objective=\"reg:logistic\", eval_metric=\"logloss\")\n",
" model.client = client \n",
" model.fit(x_train, y_train, eval_set=[(x_true, y_true)])\n",
" if not Path(MODEL_DIR).exists():\n",
" Path(MODEL_DIR).mkdir(parents=True, exist_ok=True)\n",
" model.save_model(MODEL_PATH)\n",
" \n",
" #generate metrics\n",
" metrics = evaluate_model(model, x_true, y_true)\n",
" if not Path(deliverable_uri).exists():\n",
" Path(deliverable_uri).mkdir(parents=True, exist_ok=True)\n",
" with open(metrics_uri, \"w\") as file:\n",
" json.dump(metrics, file, sort_keys = True, indent = 4)\n",
" file.close()\n",
" \n",
"if __name__ == \"__main__\":\n",
" main()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"#### Define a custom image for Dask model training\n",
"\n",
"Now you will build a custom container. By running your training job in a custom container, you can use any ML framework, non-ML dependencies, libraries, and binaries. Next you will package your training code into a Docker container image, push the container image to Artifact Registry, and create a custom job on Vertex AI, which will use the container image on Artifact Registry. As the evolution of Container Registry, Artifact Registry is a single place for your organization to manage container images and language packages. It's fullly intergrated with the Vertex AI platform. You can read more in our [documentation](https://cloud.google.com/artifact-registry). "
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# Create image repository\n",
"!gcloud artifacts repositories create $IMAGE_REPOSITORY --repository-format=docker --location=us-central1 --description=\"FraudFinder Docker Image repository\"\n",
"\n",
"# List repositories under the project\n",
"!gcloud artifacts repositories list\n",
"\n",
"# Get info on the repository\n",
"!gcloud artifacts repositories describe $IMAGE_REPOSITORY --location=us-central1"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Run the follow cell to allow this notebook to push to Artifact Registry"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"!gcloud auth configure-docker us-central1-docker.pkg.dev -q"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Next you need to write your Dockerfile in order to create your container. "
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"%%writefile build_training/Dockerfile\n",
"# Specifies base image and tag\n",
"FROM python:3.7\n",
"WORKDIR /root\n",
"\n",
"# Installs additional packages\n",
"RUN pip install gcsfs numpy pandas scikit-learn dask distributed xgboost --upgrade\n",
"\n",
"# Copies the trainer code to the docker image.\n",
"COPY ./train_xgb.py /root/train_xgb.py\n",
"\n",
"# Sets up the entry point to invoke the trainer.\n",
"ENTRYPOINT [\"python3\", \"train_xgb.py\"]"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Next, build and push the Docker container. "
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"tags": []
},
"outputs": [],
"source": [
"# Build and push Docker container\n",
"!docker build -t $IMAGE_URI ./build_training/\n",
"!docker push $IMAGE_URI\n",
"\n",
"print(\"Done\")"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"#### Start a custom training job on Vertex AI\n",
"Now that you have created your custom container, you will create a training job on Vertex AI. This will create a custom training job, load our dataset and register the model to Vertex AI Model Registry after the training job is successfully completed. Learn more about the creaton of custom jobs [here](https://cloud.google.com/vertex-ai/docs/training/create-custom-job)."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"job = vertex_ai.CustomContainerTrainingJob(\n",
" display_name=TRAIN_JOB_NAME,\n",
" container_uri=IMAGE_URI,\n",
" model_serving_container_image_uri=MODEL_SERVING_IMAGE_URI,\n",
")\n",
"\n",
"parameters = {\"MAX_DEPTH\": 4, \"ETA\": 0.3, \"GAMMA\": 0.1}\n",
"\n",
"CMDARGS = [\n",
" f\"--bucket={BUCKET_NAME}\",\n",
" \"--max_depth=\" + str(parameters[\"MAX_DEPTH\"]),\n",
" \"--eta=\" + str(parameters[\"ETA\"]),\n",
" \"--gamma=\" + str(parameters[\"GAMMA\"]),\n",
"]\n",
"\n",
"model = job.run(\n",
" dataset=dataset,\n",
" model_display_name=MODEL_NAME,\n",
" args=CMDARGS,\n",
" replica_count=1,\n",
" machine_type=TRAIN_COMPUTE,\n",
" accelerator_count=0,\n",
")"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"While the model is training, you can visit the model URL, or go to the console page for [Vertex AI training jobs](https://console.cloud.google.com/vertex-ai/training/training-pipelines) to track its progress."
]
},
{
"cell_type": "markdown",
"metadata": {
"tags": []
},
"source": [
"#### Evaluate the model locally"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Before you can run the model via an endpoint, you need to transform the data so that the model can perform a prediction on that."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"LABEL_COLUMN = \"tx_fraud\"\n",
"UNUSED_COLUMNS = [\"timestamp\", \"entity_type_customer\", \"entity_type_terminal\"]\n",
"NA_VALUES = [\"NA\", \".\"]\n",
"\n",
"\n",
"def preprocess(df):\n",
" \"\"\"Converts categorical features to numeric. Removes unused columns.\n",
"\n",
" Args:\n",
" df: Pandas df with raw data\n",
"\n",
" Returns:\n",
" df with preprocessed data\n",
" \"\"\"\n",
" df = df.drop(columns=UNUSED_COLUMNS)\n",
"\n",
" # Drop rows with NaN's\n",
" df = df.dropna()\n",
"\n",
" # Convert integer valued (numeric) columns to floating point\n",
" numeric_columns = df.select_dtypes([\"int32\", \"float32\", \"float64\"]).columns\n",
" df[numeric_columns] = df[numeric_columns].astype(\"float32\")\n",
"\n",
" dummy_columns = list(df.dtypes[df.dtypes == \"category\"].index)\n",
" df = pd.get_dummies(df, columns=dummy_columns)\n",
"\n",
" return df\n",
"\n",
"\n",
"# test set\n",
"train_sample_path = os.path.join(TRAIN_DATA_DIR, \"000000000000.csv\")\n",
"df_test = pd.read_csv(train_sample_path)\n",
"preprocessed_test_Data = preprocess(df_test)\n",
"\n",
"x_test = preprocessed_test_Data[\n",
" preprocessed_test_Data.columns.drop(LABEL_COLUMN).to_list()\n",
"].values\n",
"y_test = preprocessed_test_Data.loc[:, LABEL_COLUMN].astype(int)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Next you will copy the model artifact to the local directory to evaluate the model localy before deploying the model:"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"!gsutil cp -r $model.uri ."
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Now it's time to test the model."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"bst = xgb.Booster()\n",
"bst.load_model(\"./model/model.bst\")\n",
"xgtest = xgb.DMatrix(x_test)\n",
"y_pred_prob = bst.predict(xgtest)\n",
"y_pred = y_pred_prob.round().astype(int)\n",
"y_pred_prob[0:10]\n",
"precision_recall_fscore_support(y_test.values, y_pred, average=\"weighted\")"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"#### Deploy the model\n",
"Before you use your model to make predictions, you need to deploy it to an Endpoint. You can do this by calling the deploy function on the Model resource. This will do two things:\n",
"\n",
"- create an Endpoint resource\n",
"- deploy the Model resource to the Endpoint resource\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# Compute instance type\n",
"DEPLOY_COMPUTE = \"n1-standard-4\"\n",
"\n",
"# Percentage of traffic that the model will receive in the endpoint\n",
"TRAFFIC_SPLIT = {\"0\": 100}\n",
"\n",
"# Parameters to configure the minimum and maximum nodes during autoscaling\n",
"MIN_NODES = 1\n",
"MAX_NODES = 1\n",
"\n",
"\n",
"endpoint = model.deploy(\n",
" deployed_model_display_name=MODEL_NAME,\n",
" traffic_split=TRAFFIC_SPLIT,\n",
" machine_type=DEPLOY_COMPUTE,\n",
" accelerator_count=0,\n",
" min_replica_count=MIN_NODES,\n",
" max_replica_count=MAX_NODES,\n",
")"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"#### Test the deployed model (Make an online prediction request)\n",
"Send an online prediction request to your deployed model. To make sure your deployed model is working, test it out by sending a request to the endpoint.\n",
"\n",
"Let's first get a test data."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"payload = {\"instances\": x_test[:2].tolist()}\n",
"\n",
"# In case you want to test it in the console\n",
"import json\n",
"\n",
"with open(\"predictions.json\", \"w\", encoding=\"utf-8\") as f:\n",
" json.dump(payload, f, ensure_ascii=False, indent=4)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"endpoint.predict(instances=payload[\"instances\"])"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Now that we understand we packaged our XGBoost model and started a custom training job on Vertex AI we can take the ML workflow and formalize it into a Vertex AI Pipeline.\n",
"\n",
"You can continue with the next Notebook: `06_formalization.ipynb`."
]
}
],
"metadata": {
"colab": {
"collapsed_sections": [],
"name": "notebook_template.ipynb",
"toc_visible": true
},
"environment": {
"kernel": "python3",
"name": "common-cpu.m111",
"type": "gcloud",
"uri": "gcr.io/deeplearning-platform-release/base-cpu:m111"
},
"kernelspec": {
"display_name": "Python 3",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.7.12"
}
},
"nbformat": 4,
"nbformat_minor": 4
}