notebooks/community/pipelines/google_cloud_pipeline_components_dataproc_tabular.ipynb (2,720 lines of code) (raw):
{
"cells": [
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"id": "ur8xi4C7S06n"
},
"outputs": [],
"source": [
"# Copyright 2022 Google LLC\n",
"#\n",
"# Licensed under the Apache License, Version 2.0 (the \"License\");\n",
"# you may not use this file except in compliance with the License.\n",
"# You may obtain a copy of the License at\n",
"#\n",
"# https://www.apache.org/licenses/LICENSE-2.0\n",
"#\n",
"# Unless required by applicable law or agreed to in writing, software\n",
"# distributed under the License is distributed on an \"AS IS\" BASIS,\n",
"# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\n",
"# See the License for the specific language governing permissions and\n",
"# limitations under the License."
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "JAPoU8Sm5E6e"
},
"source": [
"# Vertex AI Pipelines: Loan eligibility prediction using `google-cloud-pipeline-components` and Spark ML\n",
"\n",
"<table align=\"left\">\n",
"\n",
" <td>\n",
" <a href=\"https://colab.research.google.com/github/GoogleCloudPlatform/vertex-ai-samples/blob/main/notebooks/official/pipelines/google_cloud_pipeline_components_dataproc_tabular.ipynb\">\n",
" <img src=\"https://cloud.google.com/ml-engine/images/colab-logo-32px.png\" alt=\"Colab logo\"> Run in Colab\n",
" </a>\n",
" </td>\n",
" <td>\n",
" <a href=\"https://github.com/GoogleCloudPlatform/vertex-ai-samples/blob/main/notebooks/official/pipelines/google_cloud_pipeline_components_dataproc_tabular.ipynb\">\n",
" <img src=\"https://cloud.google.com/ml-engine/images/github-logo-32px.png\" alt=\"GitHub logo\">\n",
" View on GitHub\n",
" </a> \n",
" </td>\n",
" <td>\n",
"<a href=\"https://console.cloud.google.com/vertex-ai/workbench/deploy-notebook?download_url=https://raw.githubusercontent.com/GoogleCloudPlatform/vertex-ai-samples/main/notebooks/official/pipelines/google_cloud_pipeline_components_dataproc_tabular.ipynb\" target='_blank'>\n",
" <img src=\"https://lh3.googleusercontent.com/UiNooY4LUgW_oTvpsNhPpQzsstV5W8F7rYgxgGBD85cWJoLmrOzhVs_ksK_vgx40SHs7jCqkTkCk=e14-rj-sc0xffffff-h130-w32\" alt=\"Vertex AI logo\">\n",
" Open in Vertex AI Workbench\n",
" </a>\n",
" </td>\n",
"</table>"
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "962e636b5cee"
},
"source": [
"**_NOTE_**: This notebook has been tested in the following environment:\n",
"\n",
"* Python version = 3.9"
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "aa2a647ffd9f"
},
"source": [
"## Overview\n",
"\n",
"This notebook shows how to build a Spark ML pipeline using Spark MLlib and DataprocPySparkBatchOp component to determine the customer eligibility for a loan from a banking company. In particular, the pipeline covers a Spark MLib pipeline, from data preprocessing to hyperparameter tuning of a random forest classifier which predicts the probability of a customer being eligible for a loan. \n",
"\n",
"Learn more about [Vertex AI Pipelines](https://cloud.google.com/vertex-ai/docs/pipelines/introduction) and [Dataproc components](https://cloud.google.com/vertex-ai/docs/pipelines/dataproc-component)."
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "cf28f8819c81"
},
"source": [
"### Objective\n",
"\n",
"In this notebook, you learn how to build a Vertex AI pipeline and train a random-forest model using Spark ML for loan-eligibility classification problem. \n",
"\n",
"This tutorial uses the following Google Cloud ML services and resources:\n",
"\n",
"- Vertex AI Datasets\n",
"- Vertex AI Pipelines\n",
"- Vertex AI Training\n",
"\n",
"\n",
"The steps performed include:\n",
"\n",
"* Use the `DataprocPySparkBatchOp` to preprocess data.\n",
"* Create a Vertex AI dataset resource on the training data.\n",
"* Train a random forest model using PySpark.\n",
"* Build a Vertex AI pipeline and run the training job.\n",
"* Use the Spark serving image in order to deploy a Spark model on Vertex AI Endpoint."
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "137eb6bf1091"
},
"source": [
"### Dataset\n",
"\n",
"The dataset is a preprocessed version of the [loan eligibility dataset](https://datasetsearch.research.google.com/search?src=2&query=Loan%20Eligible%20Dataset&docid=L2cvMTFsajJrM3EzcA%3D%3D)."
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "tvgnzT1CKxrO"
},
"source": [
"### Costs \n",
"\n",
"This tutorial uses billable components of Google Cloud:\n",
"\n",
"* Vertex AI\n",
"* Cloud Storage\n",
"* Dataproc Serverless\n",
"\n",
"Learn about [Vertex AI\n",
"pricing](https://cloud.google.com/vertex-ai/pricing), [Cloud Storage\n",
"pricing](https://cloud.google.com/storage/pricing), [Dataproc\n",
"pricing](https://cloud.google.com/dataproc/pricing) and use the [Pricing\n",
"Calculator](https://cloud.google.com/products/calculator/)\n",
"to generate a cost estimate based on your projected usage."
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "install_aip:mbsdk"
},
"source": [
"## Installation\n",
"\n",
"Install the packages required for executing this notebook."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"id": "2b4ef9b72d43"
},
"outputs": [],
"source": [
"import os\n",
"\n",
"# (optional) update gcloud if needed\n",
"if os.getenv(\"IS_TESTING\"):\n",
" ! gcloud components update --quiet\n",
"\n",
"\n",
"! pip3 install --upgrade --quiet google-cloud-aiplatform==1.30.1 \\\n",
" kfp==1.8.14 \\\n",
" google-cloud-pipeline-components==1.0.33 --no-warn-conflicts"
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "58707a750154"
},
"source": [
"### Colab only: Uncomment the following cell to restart the kernel."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"id": "f200f10a1da3"
},
"outputs": [],
"source": [
"# Automatically restart kernel after installs so that your environment can access the new packages\n",
"# import IPython\n",
"\n",
"# app = IPython.Application.instance()\n",
"# app.kernel.do_shutdown(True)"
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "BF1j6f9HApxa"
},
"source": [
"## Before you begin\n",
"\n",
"### Set up your Google Cloud project\n",
"\n",
"**The following steps are required, regardless of your notebook environment.**\n",
"\n",
"1. [Select or create a Google Cloud project](https://console.cloud.google.com/cloud-resource-manager). When you first create an account, you get a $300 free credit towards your compute/storage costs.\n",
"\n",
"2. [Make sure that billing is enabled for your project](https://cloud.google.com/billing/docs/how-to/modify-project).\n",
"\n",
"3. [Enable the Artifact Registry, Cloud Build, Container Registry, Dataproc and Vertex AI APIs](https://console.cloud.google.com/flows/enableapi?apiid=aiplatform.googleapis.com,artifactregistry.googleapis.com,cloudbuild.googleapis.com,containerregistry.googleapis.com,dataproc.googleapis.com,aiplatform.googleapis.com).\n",
"\n",
"4. If you are running this notebook locally, you need to install the [Cloud SDK](https://cloud.google.com/sdk)."
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "WReHDGG5g0XY"
},
"source": [
"#### Set your project ID\n",
"\n",
"**If you don't know your project ID**, try the following:\n",
"* Run `gcloud config list`.\n",
"* Run `gcloud projects list`.\n",
"* See the support page: [Locate the project ID](https://support.google.com/googleapi/answer/7014113)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"id": "oM1iC_MfAts1"
},
"outputs": [],
"source": [
"PROJECT_ID = \"[your-project-id]\" # @param {type:\"string\"}\n",
"\n",
"# Set the project id\n",
"! gcloud config set project {PROJECT_ID}"
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "region"
},
"source": [
"#### Region\n",
"\n",
"You can also change the `REGION` variable used by Vertex AI. Learn more about [Vertex AI regions](https://cloud.google.com/vertex-ai/docs/general/locations)."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"id": "region"
},
"outputs": [],
"source": [
"REGION = \"us-central1\" # @param {type: \"string\"}"
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "06571eb4063b"
},
"source": [
"#### UUID\n",
"\n",
"If you are in a live tutorial session, you might be using a shared test account or project. To avoid name collisions between users on resources created, you create a uuid for each instance session, and append it onto the name of resources you create in this tutorial."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"id": "697568e92bd6"
},
"outputs": [],
"source": [
"import random\n",
"import string\n",
"\n",
"\n",
"# Generate a uuid of a specifed length(default=8)\n",
"def generate_uuid(length: int = 8) -> str:\n",
" return \"\".join(random.choices(string.ascii_lowercase + string.digits, k=length))\n",
"\n",
"\n",
"UUID = generate_uuid()"
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "sBCra4QMA2wR"
},
"source": [
"### Authenticate your Google Cloud account\n",
"\n",
"Depending on your Jupyter environment, you may have to manually authenticate. Follow the relevant instructions below.\n",
"\n",
"**1. Vertex AI Workbench**\n",
"* Do nothing as you are already authenticated.\n",
"\n",
"**2. Local JupyterLab instance, uncomment and run:**"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"id": "254614fa0c46"
},
"outputs": [],
"source": [
"# ! gcloud auth login"
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "ef21552ccea8"
},
"source": [
"**3. Colab, uncomment and run:**"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"id": "603adbbf0532"
},
"outputs": [],
"source": [
"# from google.colab import auth\n",
"# auth.authenticate_user()"
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "f6b2ccc891ed"
},
"source": [
"**4. Service account or other**\n",
"* See how to grant Cloud Storage permissions to your service account at https://cloud.google.com/storage/docs/gsutil/commands/iam#ch-examples."
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "b109ba134099"
},
"source": [
"### Enable Google Cloud services\n",
"\n",
"Enable the following services in your project if not already done:\n",
"\n",
"* Artifact Registry\n",
"* Cloud Build\n",
"* Container Registry\n",
"* Dataproc\n",
"* Vertex AI\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"id": "1faa3afe3686"
},
"outputs": [],
"source": [
"! gcloud services enable \\\n",
" artifactregistry.googleapis.com \\\n",
" cloudbuild.googleapis.com \\\n",
" containerregistry.googleapis.com \\\n",
" dataproc.googleapis.com \\\n",
" aiplatform.googleapis.com"
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "zgPO1eR3CYjk"
},
"source": [
"### Create a Cloud Storage bucket\n",
"\n",
"Create a storage bucket to store intermediate artifacts such as datasets."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"id": "MzGDU7TWdts_"
},
"outputs": [],
"source": [
"BUCKET_URI = f\"gs://your-bucket-name-{PROJECT_ID}-unique\" # @param {type:\"string\"}"
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "-EcIXiGsCePi"
},
"source": [
"**Only if your bucket doesn't already exist**: Run the following cell to create your Cloud Storage bucket."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"id": "NIq7R4HZCfIc"
},
"outputs": [],
"source": [
"! gsutil mb -l {REGION} -p {PROJECT_ID} {BUCKET_URI}"
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "f18bb9bb7886"
},
"source": [
"#### Service Account\n",
"\n",
"You use a service account to create Vertex AI Pipeline jobs. If you do not want to use your project's Compute Engine service account, set `SERVICE_ACCOUNT` to another service account ID."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"id": "14f47a082714"
},
"outputs": [],
"source": [
"SERVICE_ACCOUNT = \"[your-service-account]\" # @param {type:\"string\"}"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"id": "5cacc638960a"
},
"outputs": [],
"source": [
"import os\n",
"import sys\n",
"\n",
"IS_COLAB = \"google.colab\" in sys.modules\n",
"\n",
"if (\n",
" SERVICE_ACCOUNT == \"\"\n",
" or SERVICE_ACCOUNT is None\n",
" or SERVICE_ACCOUNT == \"[your-service-account]\"\n",
"):\n",
" # Get your service account from gcloud\n",
" if not IS_COLAB:\n",
" shell_output = !gcloud auth list 2>/dev/null\n",
" SERVICE_ACCOUNT = shell_output[2].replace(\"*\", \"\").strip()\n",
"\n",
" else: # IS_COLAB:\n",
" shell_output = ! gcloud projects describe $PROJECT_ID\n",
" project_number = shell_output[-1].split(\":\")[1].strip().replace(\"'\", \"\")\n",
" SERVICE_ACCOUNT = f\"{project_number}-compute@developer.gserviceaccount.com\"\n",
"\n",
" print(\"Service Account:\", SERVICE_ACCOUNT)"
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "cf0decaa4574"
},
"source": [
"#### Set service account access for Vertex AI Pipelines\n",
"\n",
"Run the following commands to grant your service account access to read and write pipeline artifacts in the bucket that you created in the previous step. You only need to run this step once per service account."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"id": "f5a82ff107b6"
},
"outputs": [],
"source": [
"! gsutil iam ch serviceAccount:{SERVICE_ACCOUNT}:roles/storage.objectCreator $BUCKET_URI\n",
"\n",
"! gsutil iam ch serviceAccount:{SERVICE_ACCOUNT}:roles/storage.objectViewer $BUCKET_URI"
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "mBwfdPM7M5X7"
},
"source": [
"### Load preprocessing data\n",
"\n",
"The notebook uses a preprocessed set of data you read from the Vertex AI Feature Store. "
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"id": "xmapX5-cNaWV"
},
"outputs": [],
"source": [
"PUBLIC_DATA_URI = \"gs://cloud-samples-data/vertex-ai/dataset-management/datasets/loan_eligibilty/data.csv\"\n",
"FEATURES_TRAIN_URI = f\"{BUCKET_URI}/data/features/snapshots/{UUID}\"\n",
"\n",
"!gsutil cp -r $PUBLIC_DATA_URI $FEATURES_TRAIN_URI"
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "-TRRTduWyx7f"
},
"source": [
"### Enabling Private Google Access for Dataproc Serverless"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"id": "d-1ZXXtTyzI5"
},
"outputs": [],
"source": [
"SUBNETWORK = \"default\" # @param {type:\"string\"}\n",
"\n",
"!gcloud compute networks subnets list --regions=$REGION --filter=$SUBNETWORK\n",
"\n",
"!gcloud compute networks subnets update $SUBNETWORK \\\n",
"--region=$REGION \\\n",
"--enable-private-ip-google-access\n",
"\n",
"!gcloud compute networks subnets describe $SUBNETWORK \\\n",
"--region=$REGION \\\n",
"--format=\"get(privateIpGoogleAccess)\""
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "o7p_RIzNM02N"
},
"source": [
"### Create the Docker repository\n",
"\n",
"You create a Docker repository in the Artifact Registry for the custom dataproc image that you are going to create."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"id": "KfTW_fMeWq3e"
},
"outputs": [],
"source": [
"# set repo name\n",
"REPO_NAME = \"loan-eligibility-spark-demo\"\n",
"\n",
"# create the repository\n",
"!gcloud artifacts repositories create $REPO_NAME \\\n",
" --repository-format=docker \\\n",
" --location=$REGION \\\n",
" --quiet \\\n",
" --description=\"loan eligibility spark docker repository\""
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "XoEqT2Y4DJmf"
},
"source": [
"### Import libraries and define constants"
]
},
{
"cell_type": "code",
"execution_count": 1,
"metadata": {
"id": "pRUOFELefqf1"
},
"outputs": [],
"source": [
"# General\n",
"from pathlib import Path as path\n",
"from typing import NamedTuple\n",
"\n",
"from google.cloud import aiplatform as vertex_ai\n",
"from kfp.v2 import compiler, dsl\n",
"from kfp.v2.dsl import (ClassificationMetrics, Condition, Metrics, Output,\n",
" component)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"id": "gQOV9ssPWbMB"
},
"outputs": [],
"source": [
"# Setup\n",
"DATAPROC_RUNTIME_VERSION = \"1.1.20\"\n",
"SRC = path(\"src\")\n",
"BUILD_PATH = path(\"build\")\n",
"DELIVERABLES = path(\"deliverables\")\n",
"DATA_PATH = path(\"data\")\n",
"RUNTIME_IMAGE = \"dataproc_serverless_custom_runtime\"\n",
"IMAGE_TAG = \"1.0.0\"\n",
"\n",
"# Pipeline\n",
"PIPELINE_NAME = \"pyspark-loan-eligibility-pipeline\"\n",
"PIPELINE_ROOT = f\"{BUCKET_URI}/pipelines\"\n",
"PIPELINE_PACKAGE_PATH = str(BUILD_PATH / f\"pipeline_{UUID}.json\")\n",
"RUNTIME_CONTAINER_IMAGE = f\"gcr.io/{PROJECT_ID}/{RUNTIME_IMAGE}:{IMAGE_TAG}\"\n",
"SUBNETWORK_URI = f\"projects/{PROJECT_ID}/regions/{REGION}/subnetworks/{SUBNETWORK}\"\n",
"ML_APPLICATION = \"loan-eligibility\"\n",
"TASK = \"sparkml\"\n",
"MODEL_TYPE = \"rfor\"\n",
"VERSION = \"1.0.0\"\n",
"MODEL_NAME = f\"{ML_APPLICATION}-{TASK}-{MODEL_TYPE}-{VERSION}\"\n",
"ARTIFACT_URI = f\"{BUCKET_URI}/deliverables/bundle/{UUID}\"\n",
"\n",
"# Preprocessing\n",
"PREPROCESSING_PYTHON_FILE_URI = f\"{BUCKET_URI}/src/data_preprocessing.py\"\n",
"PROCESSED_DATA_URI = f\"{BUCKET_URI}/data/processed\"\n",
"PREPROCESSING_ARGS = [\n",
" \"--train-data-path\",\n",
" FEATURES_TRAIN_URI,\n",
" \"--out-process-path\",\n",
" PROCESSED_DATA_URI,\n",
"]\n",
"\n",
"# Training\n",
"TRAINING_PYTHON_FILE_URI = f\"{BUCKET_URI}/src/model_training.py\"\n",
"MODEL_URI = f\"{BUCKET_URI}/deliverables/model/rfor/{UUID}/train_model\"\n",
"METRICS_URI = f\"{BUCKET_URI}/deliverables/metrics/rfor/{UUID}/train_metrics.json\"\n",
"TRAINING_ARGS = [\n",
" \"--train-path\",\n",
" PROCESSED_DATA_URI,\n",
" \"--model-path\",\n",
" MODEL_URI,\n",
" \"--metrics-path\",\n",
" METRICS_URI,\n",
"]\n",
"\n",
"# Condition\n",
"AUPR_THRESHOLD = 0.5\n",
"AUPR_HYPERTUNE_CONDITION = \"hypertune\"\n",
"\n",
"# Hypertuning\n",
"HPT_PYTHON_FILE_URI = f\"{BUCKET_URI}/src/hp_tuning.py\"\n",
"HPT_MODEL_URI = f\"{BUCKET_URI}/deliverables/model/rfor/{UUID}/model\"\n",
"HPT_METRICS_URI = f\"{BUCKET_URI}/deliverables/metrics/rfor/{UUID}/metrics.json\"\n",
"HPT_ARGS = [\n",
" \"--train-path\",\n",
" PROCESSED_DATA_URI,\n",
" \"--model-path\",\n",
" HPT_MODEL_URI,\n",
" \"--metrics-path\",\n",
" HPT_METRICS_URI,\n",
"]\n",
"HPT_BUNDLE_URI = f\"{ARTIFACT_URI}/model.zip\"\n",
"HPT_ARGS = [\n",
" \"--train-path\",\n",
" PROCESSED_DATA_URI,\n",
" \"--model-path\",\n",
" HPT_MODEL_URI,\n",
" \"--metrics-path\",\n",
" HPT_METRICS_URI,\n",
" \"--bundle-path\",\n",
" HPT_BUNDLE_URI,\n",
"]\n",
"HPT_RUNTIME_PROPERTIES = {\n",
" \"spark.jars.packages\": \"ml.combust.mleap:mleap-spark-base_2.12:0.21.1,ml.combust.mleap:mleap-spark_2.12:0.21.1\"\n",
"}\n",
"\n",
"# Experiment\n",
"EXPERIMENT_NAME = \"loan-eligibility\"\n",
"\n",
"# Deploy\n",
"SERVING_IMAGE_URI = f\"{REGION}-docker.pkg.dev/{PROJECT_ID}/{REPO_NAME}/spark-ml-serving\""
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "Z949NOrk-WXQ"
},
"source": [
"### Initialize the Vertex AI SDK client"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"id": "cnj-_0Ljzox_"
},
"outputs": [],
"source": [
"vertex_ai.init(\n",
" project=PROJECT_ID,\n",
" location=REGION,\n",
" staging_bucket=BUCKET_URI,\n",
" experiment=EXPERIMENT_NAME,\n",
")"
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "LB2aM7VyRyZG"
},
"source": [
"## Build the Vertex Pipeline to train and deploy a Spark model\n",
"\n",
"In this case, the ML pipeline includes the following steps:\n",
"\n",
"1. Impute categorical and numerical variables with `DataprocPySparkBatchOp`\n",
"2. Train an `RandomForestClassifier` with `DataprocPySparkBatchOp`\n",
"3. Run a custom component in order to evaluate the model\n",
"\n",
"If the model respects the performance condition, then:\n",
"\n",
"4. Hypertune the `RandomForestClassifier` with `DataprocPySparkBatchOp`\n",
"5. Serializes the model to MLeap format to use the model outside of Spark.\n",
"\n",
"If the `deploy_model` pipeline parameter is set to `True`:\n",
"\n",
"6. Upload the model to Vertex AI Model Registry.\n",
"7. Creates a Vertex AI endpoint.\n",
"8. Deploys the model to the Vertex AI endpoint for serving online prediction requests.\n"
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "ac5229530e16"
},
"source": [
"### Define the code for PySpark jobs\n",
"\n",
"Define the code for data-preprocessing, model-training and hyperparameter-tuning.\n",
"\n",
"Initialize a source directory for the code."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"id": "542d3661d811"
},
"outputs": [],
"source": [
"# make a source directory to save the code\n",
"! mkdir $SRC\n",
"! echo \"\" > $SRC/__init__.py"
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "f137006a70a0"
},
"source": [
"#### Create the source code for data-preprocessing\n",
"\n",
"Create the `data_preprocessing.py` file that ingests the data, preprocesses it for training and uploads the processed data to the Cloud Storage. Through this code, a Spark session is created with logging enabled. The preprocessing is handled through this session and involves converting the datatypes of the variables `label` and `loan_amount` from string to double. The arguments to this code are defined as follows:\n",
"\n",
"- `--train-data-path`: The GCS path of the training sample.\n",
"- `--out-process-path`: The path to save the processed data."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"id": "d15ad2543c21"
},
"outputs": [],
"source": [
"%%writefile $SRC/data_preprocessing.py\n",
"\n",
"#!/usr/bin/env python3\n",
"# -*- coding: utf-8 -*-\n",
"\n",
"\"\"\"\n",
"data_preprocessing.py is the module for\n",
"\n",
" - ingest data\n",
" - do simple preprocessing tasks\n",
" - upload processed data to gcs\n",
"\"\"\"\n",
"\n",
"# Libraries --------------------------------------------------------------------------------\n",
"import logging\n",
"import argparse\n",
"from pathlib import Path\n",
"import sys\n",
"\n",
"try:\n",
" from pyspark import SparkContext, SparkConf\n",
" from pyspark.sql import SparkSession\n",
"except ImportError as error:\n",
" print('WARN: Something wrong with pyspark library. Please check configuration settings!')\n",
" print(error)\n",
"\n",
"from pyspark.sql.types import StructType, DoubleType, StringType\n",
"\n",
"# Variables --------------------------------------------------------------------------------\n",
"DATA_SCHEMA = (StructType()\n",
" .add(\"label\", StringType(), True)\n",
" .add(\"loan_amount\", StringType(), True)\n",
" .add(\"loan_term\", StringType(), True)\n",
" .add(\"property_area\", StringType(), True)\n",
" .add(\"timestamp\", StringType(), True)\n",
" .add(\"entity_type_customer_id\", StringType(), True)\n",
" .add(\"feature_7\", DoubleType(), True)\n",
" .add(\"feature_3\", DoubleType(), True)\n",
" .add(\"feature_1\", DoubleType(), True)\n",
" .add(\"feature_9\", DoubleType(), True)\n",
" .add(\"feature_5\", DoubleType(), True)\n",
" .add(\"feature_0\", DoubleType(), True)\n",
" .add(\"feature_8\", DoubleType(), True)\n",
" .add(\"feature_4\", DoubleType(), True)\n",
" .add(\"feature_2\", DoubleType(), True)\n",
" .add(\"feature_6\", DoubleType(), True)\n",
" )\n",
"\n",
"ENTITY_CUSTOMER_ID = 'entity_type_customer_id'\n",
"FEATURE_STORE_IDS = ['timestamp', 'entity_type_customer_id']\n",
"CATEGORICAL_VARIABLES = ['loan_term', 'property_area']\n",
"IDX_CATEGORICAL_FEATURES = [f'{col}_idx' for col in CATEGORICAL_VARIABLES]\n",
"TARGET = 'label'\n",
"\n",
"\n",
"# Helpers ----------------------------------------------------------------------------------\n",
"\n",
"def set_logger():\n",
" \"\"\"\n",
" Set logger for the module\n",
" Returns:\n",
" logger: logger object\n",
" \"\"\"\n",
" fmt_pattern = \"%(asctime)s — %(name)s — %(levelname)s —\" \"%(funcName)s:%(lineno)d — %(message)s\"\n",
" main_logger = logging.getLogger(__name__)\n",
" main_logger.setLevel(logging.INFO)\n",
" main_logger.propagate = False\n",
" stream_handler = logging.StreamHandler(sys.stdout)\n",
" stream_handler.setLevel(logging.INFO)\n",
" formatter = logging.Formatter(fmt_pattern)\n",
" stream_handler.setFormatter(formatter)\n",
" main_logger.addHandler(stream_handler)\n",
" return main_logger\n",
"\n",
"\n",
"def get_args():\n",
" \"\"\"\n",
" Get arguments from command line\n",
" Returns:\n",
" args: arguments from command line\n",
" \"\"\"\n",
" args_parser = argparse.ArgumentParser()\n",
" args_parser.add_argument(\n",
" '--train-data-path',\n",
" help='The GCS path of training sample',\n",
" type=str,\n",
" required=True)\n",
" args_parser.add_argument(\n",
" '--out-process-path',\n",
" help='''\n",
" The path to load processed data. \n",
" Format: \n",
" - locally: /path/to/dir\n",
" - cloud: gs://bucket/path\n",
" ''',\n",
" type=str,\n",
" required=True)\n",
" return args_parser.parse_args()\n",
"\n",
"\n",
"# Main -------------------------------------------------------------------------------------\n",
"\n",
"def main(logger, args):\n",
" \"\"\"\n",
" Main function\n",
" Args:\n",
" logger: logger object\n",
" args: arguments from command line\n",
" Returns:\n",
" None\n",
" \"\"\"\n",
" # variables\n",
" train_data_path = args.train_data_path\n",
" output_data_path = args.out_process_path\n",
"\n",
" logger.info('initializing data preprocessing.')\n",
" logger.info('start spark session.')\n",
"\n",
" spark = (SparkSession.builder\n",
" .master(\"local[*]\")\n",
" .appName(\"loan eligibility\")\n",
" .getOrCreate())\n",
" try:\n",
" logger.info(f'spark version: {spark.sparkContext.version}')\n",
" logger.info('start ingesting data.')\n",
"\n",
" training_data_raw_df = (spark.read.option(\"header\", True)\n",
" .option(\"delimiter\", ',')\n",
" .schema(DATA_SCHEMA)\n",
" .csv(train_data_path)\n",
" .drop(*FEATURE_STORE_IDS))\n",
"\n",
" training_data_raw_df = training_data_raw_df.withColumn(\"label\",\n",
" training_data_raw_df.label.cast('double'))\n",
" training_data_raw_df = training_data_raw_df.withColumn(\"loan_amount\",\n",
" training_data_raw_df.loan_amount.cast('double'))\n",
" training_data_raw_df.show(truncate=False)\n",
"\n",
" logger.info(f'load prepared data to {output_data_path}.')\n",
" training_data_raw_df.write.mode('overwrite').csv(str(output_data_path), header=True)\n",
" except RuntimeError as main_error:\n",
" logger.error(main_error)\n",
" else:\n",
" logger.info('data preprocessing successfully completed!')\n",
" return 0\n",
"\n",
"\n",
"if __name__ == \"__main__\":\n",
" runtime_args = get_args()\n",
" runtime_logger = set_logger()\n",
" main(runtime_logger, runtime_args)"
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "399eba3ab133"
},
"source": [
"#### Create the source code for model-training\n",
"\n",
"Create the `model_training.py` file for training a random-forest classifier model on the training data. The training is performed using Spark ML inside a Spark session. The code fetches the training data from the Cloud storage bucket, processes it and trains the random-forest model. The trained model and the metrics obtained from the trained model (like AUC-ROC, accuracy, precision etc.) are then saved to the provided output Cloud Storage path. This code accepts the following arguments:\n",
"\n",
"- `--train-path`: The Cloud Storage path of the training sample.\n",
"- `--model-path`: The Cloud Storage path to store the trained model.\n",
"- `--metrics-path`: The Cloud Storage path to store the metrics of model."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"id": "e6c59492b111"
},
"outputs": [],
"source": [
"%%writefile $SRC/model_training.py\n",
"\n",
"#!/usr/bin/env python3\n",
"# -*- coding: utf-8 -*-\n",
"\n",
"\"\"\"\n",
"model_training.py is the module for training spark pipeline\n",
"\"\"\"\n",
"\n",
"# Libraries --------------------------------------------------------------------------------\n",
"import logging\n",
"import sys\n",
"import argparse\n",
"from pathlib import Path as path\n",
"import tempfile\n",
"import json\n",
"from urllib.parse import urlparse\n",
"\n",
"try:\n",
" from pyspark import SparkContext, SparkConf\n",
" from pyspark.sql import SparkSession\n",
"except ImportError as e:\n",
" print('WARN: Something wrong with pyspark library. Please check configuration settings!')\n",
" print(e)\n",
"\n",
"from pyspark.sql.types import StructType, DoubleType, StringType\n",
"from pyspark.sql.functions import col, udf\n",
"from pyspark.sql.functions import round as spark_round\n",
"from pyspark.ml.feature import StringIndexer, StandardScaler, VectorAssembler\n",
"from pyspark.ml.classification import RandomForestClassifier\n",
"from pyspark.ml.evaluation import BinaryClassificationEvaluator, MulticlassClassificationEvaluator\n",
"from pyspark.ml import Pipeline\n",
"\n",
"from google.cloud import storage\n",
"\n",
"# Variables --------------------------------------------------------------------------------\n",
"\n",
"# Data schema\n",
"DATA_SCHEMA = (StructType()\n",
" .add(\"label\", DoubleType(), True)\n",
" .add(\"loan_amount\", DoubleType(), True)\n",
" .add(\"loan_term\", StringType(), True)\n",
" .add(\"property_area\", StringType(), True)\n",
" .add(\"feature_7\", DoubleType(), True)\n",
" .add(\"feature_3\", DoubleType(), True)\n",
" .add(\"feature_1\", DoubleType(), True)\n",
" .add(\"feature_9\", DoubleType(), True)\n",
" .add(\"feature_5\", DoubleType(), True)\n",
" .add(\"feature_0\", DoubleType(), True)\n",
" .add(\"feature_8\", DoubleType(), True)\n",
" .add(\"feature_4\", DoubleType(), True)\n",
" .add(\"feature_2\", DoubleType(), True)\n",
" .add(\"feature_6\", DoubleType(), True)\n",
" )\n",
"\n",
"# Training\n",
"TARGET = 'label'\n",
"CATEGORICAL_VARIABLES = ['loan_term', 'property_area']\n",
"IDX_CATEGORICAL_FEATURES = [f'{col}_idx' for col in CATEGORICAL_VARIABLES]\n",
"REAL_TIME_FEATURES_VECTOR = 'real_time_features_vector'\n",
"REAL_TIME_FEATURES = 'real_time_features'\n",
"FEATURES_SELECTED = ['feature_0', 'feature_1', 'feature_2', 'feature_3', 'feature_4', 'feature_5',\n",
" 'feature_6', 'feature_7', 'feature_8', 'feature_9', 'real_time_features']\n",
"FEATURES = 'features'\n",
"RANDOM_SEED = 8\n",
"RANDOM_QUOTAS = [0.8, 0.2]\n",
"\n",
"\n",
"# Helpers ----------------------------------------------------------------------------------\n",
"def set_logger():\n",
" \"\"\"\n",
" Set logger\n",
" Returns:\n",
" logger: logger\n",
" \"\"\"\n",
" fmt_pattern = \"%(asctime)s — %(name)s — %(levelname)s —\" \"%(funcName)s:%(lineno)d — %(message)s\"\n",
" main_logger = logging.getLogger(__name__)\n",
" main_logger.setLevel(logging.INFO)\n",
" main_logger.propagate = False\n",
" stream_handler = logging.StreamHandler(sys.stdout)\n",
" stream_handler.setLevel(logging.INFO)\n",
" formatter = logging.Formatter(fmt_pattern)\n",
" stream_handler.setFormatter(formatter)\n",
" main_logger.addHandler(stream_handler)\n",
" return main_logger\n",
"\n",
"\n",
"def get_args():\n",
" \"\"\"\n",
" Get arguments\n",
" Returns:\n",
" args: arguments\n",
" \"\"\"\n",
" args_parser = argparse.ArgumentParser()\n",
" args_parser.add_argument(\n",
" '--train-path',\n",
" help='''\n",
" The GCS path of training data'\n",
" Format: \n",
" - locally: /path/to/dir\n",
" - cloud: gs://bucket/path\n",
" ''',\n",
" type=str,\n",
" required=True)\n",
" args_parser.add_argument(\n",
" '--model-path',\n",
" help='''\n",
" The GCS path to store the trained model. \n",
" Format: \n",
" - locally: /path/to/dir\n",
" - cloud: gs://bucket/path\n",
" ''',\n",
" type=str,\n",
" required=True)\n",
" args_parser.add_argument(\n",
" '--metrics-path',\n",
" help='''\n",
" The GCS path to store the metrics of model. \n",
" Format: \n",
" - locally: /path/to/dir\n",
" - cloud: gs://bucket/path\n",
" ''',\n",
" type=str,\n",
" required=True)\n",
" return args_parser.parse_args()\n",
"\n",
"\n",
"def build_preprocessing_components():\n",
" \"\"\"\n",
" Build preprocessing components\n",
" Returns:\n",
" data_preprocessing_stages: data preprocessing stages\n",
" \"\"\"\n",
" loan_term_indexer = StringIndexer(inputCol=CATEGORICAL_VARIABLES[0], outputCol=IDX_CATEGORICAL_FEATURES[0],\n",
" stringOrderType='frequencyDesc', handleInvalid='keep')\n",
" property_area_indexer = StringIndexer(inputCol=CATEGORICAL_VARIABLES[1], outputCol=IDX_CATEGORICAL_FEATURES[1],\n",
" stringOrderType='frequencyDesc', handleInvalid='keep')\n",
" data_preprocessing_stages = [loan_term_indexer, property_area_indexer]\n",
" return data_preprocessing_stages\n",
"\n",
"\n",
"def build_feature_engineering_components():\n",
" \"\"\"\n",
" Build feature engineering components\n",
" Returns:\n",
" feature_engineering_stages: feature engineering stages\n",
" \"\"\"\n",
" feature_engineering_stages = []\n",
" realtime_vector_assembler = VectorAssembler(inputCols=IDX_CATEGORICAL_FEATURES, outputCol=REAL_TIME_FEATURES_VECTOR)\n",
" realtime_scaler = StandardScaler(inputCol=REAL_TIME_FEATURES_VECTOR, outputCol=REAL_TIME_FEATURES)\n",
" features_vector_assembler = VectorAssembler(inputCols=FEATURES_SELECTED, outputCol=FEATURES)\n",
" feature_engineering_stages.extend((realtime_vector_assembler,\n",
" realtime_scaler,\n",
" features_vector_assembler))\n",
" return feature_engineering_stages\n",
"\n",
"\n",
"def build_training_model_component():\n",
" \"\"\"\n",
" Build training model component\n",
" Returns:\n",
" model_training_stage: model_training_stage\n",
" \"\"\"\n",
" model_training_stage = []\n",
" rfor = RandomForestClassifier(featuresCol=FEATURES, labelCol=TARGET, seed=RANDOM_SEED)\n",
" model_training_stage.append(rfor)\n",
" return model_training_stage\n",
"\n",
"\n",
"def build_pipeline(data_preprocessing_stages, feature_engineering_stages, model_training_stage):\n",
" \"\"\"\n",
" Build pipeline\n",
" Args:\n",
" data_preprocessing_stages: data preprocessing stages\n",
" feature_engineering_stages: feature engineering stages\n",
" model_training_stage: model_training_stage\n",
" Returns:\n",
" pipeline: pipeline\n",
" \"\"\"\n",
" pipeline = Pipeline(stages=data_preprocessing_stages + feature_engineering_stages + model_training_stage)\n",
" return pipeline\n",
"\n",
"\n",
"def get_true_score_prediction(predictions, target):\n",
" \"\"\"\n",
" Get true score prediction\n",
" Args:\n",
" predictions: predictions\n",
" target: target\n",
" Returns:\n",
" roc_dict: a dict of roc values for each class\n",
" \"\"\"\n",
" split1_udf = udf(lambda value: value[1].item(), DoubleType())\n",
" roc_dataset = predictions.select(col(target).alias('true'),\n",
" spark_round(split1_udf('probability'), 5).alias('score'),\n",
" 'prediction')\n",
" roc_df = roc_dataset.toPandas()\n",
" roc_dict = roc_df.to_dict(orient='list')\n",
" return roc_dict\n",
"\n",
"\n",
"def get_metrics(predictions, target, mode):\n",
" \"\"\"\n",
" Get metrics\n",
" Args:\n",
" predictions: predictions\n",
" target: target column name\n",
" mode: train or test\n",
" Returns:\n",
" metrics: metrics\n",
" \"\"\"\n",
" metric_labels = ['area_roc', 'area_prc', 'accuracy', 'f1', 'precision', 'recall']\n",
" metric_cols = ['true', 'score', 'prediction']\n",
" metric_keys = [f'{mode}_{ml}' for ml in metric_labels] + metric_cols\n",
" bc_evaluator = BinaryClassificationEvaluator(labelCol=target)\n",
" mc_evaluator = MulticlassClassificationEvaluator(labelCol=target)\n",
"\n",
" # areas, acc, f1, prec, rec\n",
" metric_values = []\n",
" area_roc = round(bc_evaluator.evaluate(predictions, {bc_evaluator.metricName: 'areaUnderROC'}), 5)\n",
" area_prc = round(bc_evaluator.evaluate(predictions, {bc_evaluator.metricName: 'areaUnderPR'}), 5)\n",
" acc = round(mc_evaluator.evaluate(predictions, {mc_evaluator.metricName: \"accuracy\"}), 5)\n",
" f1 = round(mc_evaluator.evaluate(predictions, {mc_evaluator.metricName: \"f1\"}), 5)\n",
" prec = round(mc_evaluator.evaluate(predictions, {mc_evaluator.metricName: \"weightedPrecision\"}), 5)\n",
" rec = round(mc_evaluator.evaluate(predictions, {mc_evaluator.metricName: \"weightedRecall\"}), 5)\n",
"\n",
" # true, score, prediction\n",
" roc_dict = get_true_score_prediction(predictions, target)\n",
" true = roc_dict['true']\n",
" score = roc_dict['score']\n",
" pred = roc_dict['prediction']\n",
"\n",
" metric_values.extend((area_roc, area_prc, acc, f1, prec, rec, true, score, pred))\n",
" metrics = dict(zip(metric_keys, metric_values))\n",
"\n",
" return metrics\n",
"\n",
"\n",
"def upload_file(bucket_name, source_file_name, destination_blob_name):\n",
" \"\"\"\n",
" Upload file to bucket\n",
" Args:\n",
" bucket_name: bucket name\n",
" source_file_name: source file name\n",
" destination_blob_name: destination blob name\n",
" Returns:\n",
" None\n",
" \"\"\"\n",
" storage_client = storage.Client()\n",
" bucket = storage_client.bucket(bucket_name)\n",
" blob = bucket.blob(destination_blob_name)\n",
" blob.upload_from_filename(source_file_name)\n",
"\n",
"\n",
"def write_metrics(bucket_name, metrics, destination, dir='/tmp'):\n",
" \"\"\"\n",
" Write metrics to file\n",
" Args:\n",
" bucket_name: bucket name\n",
" metrics: metrics\n",
" destination: destination\n",
" dir: directory to write file temporarily\n",
" Returns:\n",
" None\n",
" \"\"\"\n",
" temp_dir = tempfile.TemporaryDirectory(dir=dir)\n",
" temp_metrics_file_path = str(path(temp_dir.name) / path(destination).name)\n",
" with open(temp_metrics_file_path, 'w') as temp_file:\n",
" json.dump(metrics, temp_file)\n",
" upload_file(bucket_name, temp_metrics_file_path, destination)\n",
" temp_dir.cleanup()\n",
"\n",
"\n",
"# Main -------------------------------------------------------------------------------------\n",
"\n",
"def main(logger, args):\n",
" \"\"\"\n",
" Main function\n",
" Args:\n",
" logger: logger\n",
" args: args\n",
" Returns:\n",
" None\n",
" \"\"\"\n",
" train_path = args.train_path\n",
" model_path = args.model_path\n",
" metrics_path = args.metrics_path\n",
"\n",
" try:\n",
" logger.info('initializing pipeline training.')\n",
" logger.info('start spark session.')\n",
" spark = (SparkSession.builder\n",
" .master(\"local[*]\")\n",
" .appName(\"loan eligibility\")\n",
" .getOrCreate())\n",
" logger.info(f'spark version: {spark.sparkContext.version}')\n",
" logger.info('start bulding pipeline.')\n",
" preprocessing_stages = build_preprocessing_components()\n",
" feature_engineering_stages = build_feature_engineering_components()\n",
" model_training_stage = build_training_model_component()\n",
" pipeline = build_pipeline(preprocessing_stages, feature_engineering_stages, model_training_stage)\n",
"\n",
" logger.info(f'load train data from {train_path}.')\n",
" raw_data = (spark.read.format('csv')\n",
" .option(\"header\", \"true\")\n",
" .schema(DATA_SCHEMA)\n",
" .load(train_path))\n",
"\n",
" logger.info(f'fit model pipeline.')\n",
" train, test = raw_data.randomSplit(RANDOM_QUOTAS, seed=RANDOM_SEED)\n",
" pipeline_model = pipeline.fit(train)\n",
" predictions = pipeline_model.transform(test)\n",
" metrics = get_metrics(predictions, TARGET, 'test')\n",
" for m, v in metrics.items():\n",
" print(f'{m}: {v}')\n",
"\n",
" logger.info(f'load model pipeline in {model_path}.')\n",
" pipeline.write().overwrite().save(model_path)\n",
"\n",
" logger.info(f'Upload metrics under {metrics_path}.') \n",
" bucket = urlparse(model_path).netloc\n",
" metrics_file_path = urlparse(metrics_path).path.strip('/')\n",
" write_metrics(bucket, metrics, metrics_file_path)\n",
" \n",
" except RuntimeError as main_error:\n",
" logger.error(main_error)\n",
" else:\n",
" logger.info('model pipeline training successfully completed!')\n",
" return 0\n",
"\n",
"\n",
"if __name__ == \"__main__\":\n",
" runtime_args = get_args()\n",
" runtime_logger = set_logger()\n",
" main(runtime_logger, runtime_args)"
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "91b5ef3ad59f"
},
"source": [
"#### Create the source code for hyperparameter-tuning\n",
"\n",
"Create the `hp_tuning.py` file for tuning the hyperparameters of the random-forest classifier model using crossvalidation. This code accepts the following arguments:\n",
"\n",
"- `--train-path`: The GCS path of the training sample.\n",
"- `--model-path`: The GCS path to store the trained model.\n",
"- `--metrics-path`: The GCS path to store the metrics of model.\n",
"\n",
"The hyperparameter tuning job will also serialize the best performing model to an MLeap bundle, which can be imported to Vertex AI as a model for serving predictions - see the *Serve your model in Vertex AI* section further below."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"id": "d5e7d6e03154"
},
"outputs": [],
"source": [
"%%writefile $SRC/hp_tuning.py\n",
"\n",
"#!/usr/bin/env python3\n",
"# -*- coding: utf-8 -*-\n",
"\n",
"\"\"\"\n",
"hp_model_tuning.py is the module for hypertune the spark pipeline\n",
"\"\"\"\n",
"\n",
"# Libraries --------------------------------------------------------------------------------\n",
"import logging\n",
"import sys\n",
"import argparse\n",
"from os import environ\n",
"from datetime import datetime\n",
"from pathlib import Path as path\n",
"import tempfile\n",
"from urllib.parse import urlparse, urljoin\n",
"import json\n",
"\n",
"try:\n",
" from pyspark import SparkContext, SparkConf\n",
" from pyspark.sql import SparkSession\n",
"except ImportError as e:\n",
" print('WARN: Something wrong with pyspark library. Please check configuration settings!')\n",
" print(e)\n",
" \n",
"import mleap.pyspark\n",
"from mleap.pyspark.spark_support import SimpleSparkSerializer\n",
"\n",
"from pyspark.sql.types import StructType, DoubleType, StringType\n",
"from pyspark.sql.functions import col, udf\n",
"from pyspark.sql.functions import round as spark_round\n",
"from pyspark.ml.feature import StringIndexer, StandardScaler, VectorAssembler\n",
"from pyspark.ml.classification import RandomForestClassifier\n",
"from pyspark.ml.evaluation import BinaryClassificationEvaluator, MulticlassClassificationEvaluator\n",
"from pyspark.ml.tuning import ParamGridBuilder, CrossValidator\n",
"from pyspark.ml import Pipeline\n",
"\n",
"from google.cloud import storage\n",
"\n",
"# Variables --------------------------------------------------------------------------------\n",
"\n",
"# Data schema\n",
"DATA_SCHEMA = (StructType()\n",
" .add(\"label\", DoubleType(), True)\n",
" .add(\"loan_amount\", DoubleType(), True)\n",
" .add(\"loan_term\", StringType(), True)\n",
" .add(\"property_area\", StringType(), True)\n",
" .add(\"feature_7\", DoubleType(), True)\n",
" .add(\"feature_3\", DoubleType(), True)\n",
" .add(\"feature_1\", DoubleType(), True)\n",
" .add(\"feature_9\", DoubleType(), True)\n",
" .add(\"feature_5\", DoubleType(), True)\n",
" .add(\"feature_0\", DoubleType(), True)\n",
" .add(\"feature_8\", DoubleType(), True)\n",
" .add(\"feature_4\", DoubleType(), True)\n",
" .add(\"feature_2\", DoubleType(), True)\n",
" .add(\"feature_6\", DoubleType(), True)\n",
" )\n",
"\n",
"# Training\n",
"TARGET = 'label'\n",
"CATEGORICAL_VARIABLES = ['loan_term', 'property_area']\n",
"IDX_CATEGORICAL_FEATURES = [f'{col}_idx' for col in CATEGORICAL_VARIABLES]\n",
"REAL_TIME_FEATURES_VECTOR = 'real_time_features_vector'\n",
"REAL_TIME_FEATURES = 'real_time_features'\n",
"FEATURES_SELECTED = ['feature_0', 'feature_1', 'feature_2', 'feature_3', 'feature_4', 'feature_5',\n",
" 'feature_6', 'feature_7', 'feature_8', 'feature_9', 'real_time_features']\n",
"FEATURES = 'features'\n",
"RANDOM_SEED = 8\n",
"RANDOM_QUOTAS = [0.8, 0.2]\n",
"MAX_DEPTH = [5, 10, 15]\n",
"MAX_BINS = [24, 32, 40]\n",
"N_TREES = [25, 30, 35]\n",
"N_FOLDS = 5\n",
"\n",
"\n",
"# Helpers ----------------------------------------------------------------------------------\n",
"def set_logger():\n",
" \"\"\"\n",
" Set logger for the module\n",
" Returns:\n",
" logger: logger object\n",
" \"\"\"\n",
" fmt_pattern = \"%(asctime)s — %(name)s — %(levelname)s —\" \"%(funcName)s:%(lineno)d — %(message)s\"\n",
" main_logger = logging.getLogger(__name__)\n",
" main_logger.setLevel(logging.INFO)\n",
" main_logger.propagate = False\n",
" stream_handler = logging.StreamHandler(sys.stdout)\n",
" stream_handler.setLevel(logging.INFO)\n",
" formatter = logging.Formatter(fmt_pattern)\n",
" stream_handler.setFormatter(formatter)\n",
" main_logger.addHandler(stream_handler)\n",
" return main_logger\n",
"\n",
"\n",
"def get_args():\n",
" \"\"\"\n",
" Get arguments from command line\n",
" Returns:\n",
" args: arguments from command line\n",
" \"\"\"\n",
" args_parser = argparse.ArgumentParser()\n",
" args_parser.add_argument(\n",
" '--train-path',\n",
" help='''\n",
" The GCS path of training data'\n",
" Format: \n",
" - locally: /path/to/dir\n",
" - cloud: gs://bucket/path\n",
" ''',\n",
" type=str,\n",
" required=False)\n",
" args_parser.add_argument(\n",
" '--model-path',\n",
" help='''\n",
" The GCS path to store the trained model. \n",
" Format: \n",
" - locally: /path/to/dir\n",
" - cloud: gs://bucket/path\n",
" ''',\n",
" type=str,\n",
" required=False)\n",
" args_parser.add_argument(\n",
" '--metrics-path',\n",
" help='''\n",
" The GCS path to store the metrics of model. \n",
" Format: \n",
" - locally: /path/to/dir\n",
" - cloud: gs://bucket/path\n",
" ''',\n",
" type=str,\n",
" required=True)\n",
" args_parser.add_argument(\n",
" '--bundle-path',\n",
" help='''\n",
" The GCS path to store the exported MLeap bundle. \n",
" Format: \n",
" - locally: /path/to/dir\n",
" - cloud: gs://bucket/path\n",
" ''',\n",
" type=str,\n",
" required=True)\n",
" return args_parser.parse_args()\n",
"\n",
"\n",
"def build_preprocessing_components():\n",
" \"\"\"\n",
" Build preprocessing components\n",
" Returns:\n",
" preprocessing_components: preprocessing components\n",
" \"\"\"\n",
" loan_term_indexer = StringIndexer(inputCol=CATEGORICAL_VARIABLES[0], outputCol=IDX_CATEGORICAL_FEATURES[0],\n",
" stringOrderType='frequencyDesc', handleInvalid='keep')\n",
" property_area_indexer = StringIndexer(inputCol=CATEGORICAL_VARIABLES[1], outputCol=IDX_CATEGORICAL_FEATURES[1],\n",
" stringOrderType='frequencyDesc', handleInvalid='keep')\n",
" data_preprocessing_stages = [loan_term_indexer, property_area_indexer]\n",
" return data_preprocessing_stages\n",
"\n",
"\n",
"def build_feature_engineering_components():\n",
" \"\"\"\n",
" Build feature engineering components\n",
" Returns:\n",
" feature_engineering_components: feature engineering components\n",
" \"\"\"\n",
" feature_engineering_stages = []\n",
" realtime_vector_assembler = VectorAssembler(inputCols=IDX_CATEGORICAL_FEATURES, outputCol=REAL_TIME_FEATURES_VECTOR)\n",
" realtime_scaler = StandardScaler(inputCol=REAL_TIME_FEATURES_VECTOR, outputCol=REAL_TIME_FEATURES)\n",
" features_vector_assembler = VectorAssembler(inputCols=FEATURES_SELECTED, outputCol=FEATURES)\n",
" feature_engineering_stages.extend((realtime_vector_assembler,\n",
" realtime_scaler,\n",
" features_vector_assembler))\n",
" return feature_engineering_stages\n",
"\n",
"\n",
"def build_training_model_component():\n",
" \"\"\"\n",
" Build training model component\n",
" Returns:\n",
" training_model_component: training model component\n",
" \"\"\"\n",
" model_training_stage = []\n",
" rfor = RandomForestClassifier(featuresCol=FEATURES, labelCol=TARGET, seed=RANDOM_SEED)\n",
" model_training_stage.append(rfor)\n",
" return model_training_stage\n",
"\n",
"\n",
"def build_hp_pipeline(data_preprocessing_stages, feature_engineering_stages, model_training_stage):\n",
" \"\"\"\n",
" Build hyperparameter pipeline\n",
" Args:\n",
" data_preprocessing_stages: preprocessing components\n",
" feature_engineering_stages: feature engineering components\n",
" model_training_stage: training model component\n",
" Returns:\n",
" hp_pipeline: hyperparameter pipeline\n",
" \"\"\"\n",
" pipeline = Pipeline(stages=data_preprocessing_stages + feature_engineering_stages + model_training_stage)\n",
" params_grid = (ParamGridBuilder()\n",
" .addGrid(model_training_stage[0].maxDepth, MAX_DEPTH)\n",
" .addGrid(model_training_stage[0].maxBins, MAX_BINS)\n",
" .addGrid(model_training_stage[0].numTrees, N_TREES)\n",
" .build())\n",
" evaluator = BinaryClassificationEvaluator(labelCol=TARGET)\n",
" cross_validator = CrossValidator(estimator=pipeline,\n",
" estimatorParamMaps=params_grid,\n",
" evaluator=evaluator,\n",
" numFolds=N_FOLDS)\n",
" return cross_validator\n",
"\n",
"\n",
"def get_true_score_prediction(predictions, target):\n",
" \"\"\"\n",
" Get true score and prediction\n",
" Args:\n",
" predictions: predictions\n",
" target: target column\n",
" Returns:\n",
" roc_dict: a dict of roc values for each class\n",
" \"\"\"\n",
"\n",
" split1_udf = udf(lambda value: value[1].item(), DoubleType())\n",
" roc_dataset = predictions.select(col(target).alias('true'),\n",
" spark_round(split1_udf('probability'), 5).alias('score'),\n",
" 'prediction')\n",
" roc_df = roc_dataset.toPandas()\n",
" roc_dict = roc_df.to_dict(orient='list')\n",
" return roc_dict\n",
"\n",
"\n",
"def get_metrics(predictions, target, mode):\n",
" \"\"\"\n",
" Get metrics\n",
" Args:\n",
" predictions: predictions\n",
" target: target column\n",
" mode: train or test\n",
" Returns:\n",
" metrics: metrics\n",
" \"\"\"\n",
" metric_labels = ['area_roc', 'area_prc', 'accuracy', 'f1', 'precision', 'recall']\n",
" metric_cols = ['true', 'score', 'prediction']\n",
" metric_keys = [f'{mode}_{ml}' for ml in metric_labels] + metric_cols\n",
"\n",
" bc_evaluator = BinaryClassificationEvaluator(labelCol=target)\n",
" mc_evaluator = MulticlassClassificationEvaluator(labelCol=target)\n",
"\n",
" # areas, acc, f1, prec, rec\n",
" metric_values = []\n",
" area_roc = round(bc_evaluator.evaluate(predictions, {bc_evaluator.metricName: 'areaUnderROC'}), 5)\n",
" area_prc = round(bc_evaluator.evaluate(predictions, {bc_evaluator.metricName: 'areaUnderPR'}), 5)\n",
" acc = round(mc_evaluator.evaluate(predictions, {mc_evaluator.metricName: \"accuracy\"}), 5)\n",
" f1 = round(mc_evaluator.evaluate(predictions, {mc_evaluator.metricName: \"f1\"}), 5)\n",
" prec = round(mc_evaluator.evaluate(predictions, {mc_evaluator.metricName: \"weightedPrecision\"}), 5)\n",
" rec = round(mc_evaluator.evaluate(predictions, {mc_evaluator.metricName: \"weightedRecall\"}), 5)\n",
"\n",
" # true, score, prediction\n",
" roc_dict = get_true_score_prediction(predictions, target)\n",
" true = roc_dict['true']\n",
" score = roc_dict['score']\n",
" pred = roc_dict['prediction']\n",
"\n",
" metric_values.extend((area_roc, area_prc, acc, f1, prec, rec, true, score, pred))\n",
" metrics = dict(zip(metric_keys, metric_values))\n",
"\n",
" return metrics\n",
"\n",
"\n",
"def upload_file(bucket_name, source_file_name, destination_blob_name):\n",
" storage_client = storage.Client()\n",
" bucket = storage_client.bucket(bucket_name)\n",
" blob = bucket.blob(destination_blob_name)\n",
" blob.upload_from_filename(source_file_name)\n",
"\n",
"\n",
"def write_metrics(bucket_name, metrics, destination, dir='/tmp'):\n",
" temp_dir = tempfile.TemporaryDirectory(dir=dir)\n",
" temp_metrics_file_path = str(path(temp_dir.name) / path(destination).name)\n",
" with open(temp_metrics_file_path, 'w') as temp_file:\n",
" json.dump(metrics, temp_file)\n",
" upload_file(bucket_name, temp_metrics_file_path, destination)\n",
" temp_dir.cleanup()\n",
"\n",
"\n",
"# Main -------------------------------------------------------------------------------------\n",
"\n",
"def main(logger, args):\n",
" \"\"\"\n",
" Main function\n",
" Args:\n",
" logger: logger\n",
" args: args\n",
" Returns:\n",
" None\n",
" \"\"\"\n",
" train_path = args.train_path\n",
" model_path = args.model_path\n",
" metrics_path = args.metrics_path\n",
" bundle_path = args.bundle_path\n",
"\n",
" try:\n",
" logger.info('initializing pipeline training.')\n",
" logger.info('start spark session.')\n",
" spark = (SparkSession.builder\n",
" .master(\"local[*]\")\n",
" .appName(\"loan eligibility\")\n",
" .getOrCreate())\n",
" logger.info(f'spark version: {spark.sparkContext.version}')\n",
" logger.info('start building pipeline.')\n",
" preprocessing_stages = build_preprocessing_components()\n",
" feature_engineering_stages = build_feature_engineering_components()\n",
" model_training_stage = build_training_model_component()\n",
" pipeline_cross_validator = build_hp_pipeline(preprocessing_stages, feature_engineering_stages,\n",
" model_training_stage)\n",
" logger.info(f'load train data from {train_path}.')\n",
" raw_data = (spark.read.format('csv')\n",
" .option(\"header\", \"true\")\n",
" .schema(DATA_SCHEMA)\n",
" .load(train_path))\n",
" logger.info(f'fit model pipeline.')\n",
" train, test = raw_data.randomSplit(RANDOM_QUOTAS, seed=RANDOM_SEED)\n",
" pipeline_model = pipeline_cross_validator.fit(train)\n",
" predictions = pipeline_model.transform(test)\n",
" metrics = get_metrics(predictions, TARGET, 'test')\n",
" for m, v in metrics.items():\n",
" print(f'{m}: {v}')\n",
"\n",
" logger.info(f'load model pipeline in {model_path}.')\n",
" pipeline_model.write().overwrite().save(model_path)\n",
"\n",
" logger.info(f'upload metrics under {metrics_path}.')\n",
" bucket = urlparse(model_path).netloc\n",
" metrics_file_path = urlparse(metrics_path).path.strip('/')\n",
" write_metrics(bucket, metrics, metrics_file_path)\n",
" \n",
" logger.info('export MLeap bundle to temporary location')\n",
" pipeline_model.bestModel.serializeToBundle(f'jar:file:/tmp/bundle.zip', predictions)\n",
" \n",
" logger.info(f'upload MLeap bundle to {bundle_path}')\n",
" bundle_file_path = urlparse(bundle_path).path.strip('/')\n",
" bucket = urlparse(bundle_path).netloc\n",
" logger.info(f'Copying /tmp/bundle.zip to bucket {bucket} using object name {bundle_file_path} ...')\n",
" upload_file(bucket, '/tmp/bundle.zip', bundle_file_path)\n",
" \n",
" except RuntimeError as main_error:\n",
" logger.error(main_error)\n",
" else:\n",
" logger.info('model pipeline training successfully completed!')\n",
" return 0\n",
"\n",
"\n",
"if __name__ == \"__main__\":\n",
" runtime_args = get_args()\n",
" runtime_logger = set_logger()\n",
" main(runtime_logger, runtime_args)"
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "_PnJIvsjWOcn"
},
"source": [
"### Upload source code\n",
"\n",
"In order to use the `DataprocPySparkBatchOp` from google-cloud-pipeline-components, you need to upload the code to the Cloud Storage bucket."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"id": "PhrUpIqTWTD3"
},
"outputs": [],
"source": [
"! gsutil cp $SRC/__init__.py $BUCKET_URI/src/__init__.py\n",
"! gsutil cp $SRC/data_preprocessing.py $BUCKET_URI/src/data_preprocessing.py\n",
"! gsutil cp $SRC/model_training.py $BUCKET_URI/src/model_training.py\n",
"! gsutil cp $SRC/hp_tuning.py $BUCKET_URI/src/hp_tuning.py"
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "68nYBB5GS9TB"
},
"source": [
"### Build a custom Dataproc Serverless container image\n",
"\n",
"Dataproc Serverless provides default runtime images. Learn more about the [Dataproc Serverless Spark runtime releases](https://cloud.google.com/dataproc-serverless/docs/concepts/versions/spark-runtime-versions).\n",
"\n",
"You can also use custom container images for your Dataproc Serverless workloads. The steps in this section builds a custom container image that includes additional dependencies. The custom container image can be specified when using the `DataprocPySparkBatchOp` component to launch the workload within a pipeline."
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "GF9_5IGYqLAX"
},
"source": [
"#### Define the Dataproc Serverless custom runtime image"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"id": "QgdLY9lpYkum"
},
"outputs": [],
"source": [
"!mkdir -m 777 -p $BUILD_PATH"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"id": "nWiGHdEibCcv"
},
"outputs": [],
"source": [
"%%writefile $BUILD_PATH/Dockerfile\n",
"\n",
"# Debian 11 is recommended.\n",
"FROM debian:11-slim\n",
"\n",
"# Suppress interactive prompts\n",
"ENV DEBIAN_FRONTEND=noninteractive\n",
"\n",
"# (Required) Install utilities required by Spark scripts.\n",
"RUN apt update && apt install -y procps tini\n",
"\n",
"# (Optional) Add extra jars.\n",
"ENV SPARK_EXTRA_JARS_DIR=/opt/spark/jars/\n",
"ENV SPARK_EXTRA_CLASSPATH='/opt/spark/jars/*'\n",
"RUN mkdir -p \"${SPARK_EXTRA_JARS_DIR}\"\n",
"COPY spark-bigquery-with-dependencies_2.12-0.22.2.jar \"${SPARK_EXTRA_JARS_DIR}\"\n",
"\n",
"# (Optional) Install and configure Miniconda3.\n",
"ENV CONDA_HOME=/opt/miniconda3\n",
"ENV PYSPARK_PYTHON=${CONDA_HOME}/bin/python\n",
"ENV PATH=${CONDA_HOME}/bin:${PATH}\n",
"COPY Miniconda3-py39_4.10.3-Linux-x86_64.sh .\n",
"RUN bash Miniconda3-py39_4.10.3-Linux-x86_64.sh -b -p /opt/miniconda3 \\\n",
" && ${CONDA_HOME}/bin/conda config --system --set always_yes True \\\n",
" && ${CONDA_HOME}/bin/conda config --system --set auto_update_conda False \\\n",
" && ${CONDA_HOME}/bin/conda config --system --prepend channels conda-forge \\\n",
" && ${CONDA_HOME}/bin/conda config --system --set channel_priority strict\n",
"\n",
"# (Optional) Install Conda packages.\n",
"#\n",
"# The following packages are installed in the default image, it is strongly\n",
"# recommended to include all of them.\n",
"#\n",
"# Use mamba solver to install packages quickly.\n",
"RUN ${CONDA_HOME}/bin/conda install -n base conda-libmamba-solver\n",
"RUN ${CONDA_HOME}/bin/conda install \\\n",
" cython \\\n",
" fastavro \\\n",
" fastparquet \\\n",
" gcsfs \\\n",
" google-cloud-bigquery-storage \\\n",
" google-cloud-bigquery[pandas] \\\n",
" google-cloud-dataproc \\\n",
" numpy \\\n",
" pandas \\\n",
" python \\\n",
" scikit-image \\\n",
" scikit-learn \\\n",
" scipy \\\n",
" mleap --solver=libmamba\n",
"\n",
"# (Required) Create the 'spark' group/user.\n",
"# The GID and UID must be 1099. Home directory is required.\n",
"RUN groupadd -g 1099 spark\n",
"RUN useradd -u 1099 -g 1099 -d /home/spark -m spark\n",
"USER spark"
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "C8DsE4wHqI7B"
},
"source": [
"#### Download the `spark-bigquery-with-dependencies` jar file"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"id": "4N8m-K1ldt11"
},
"outputs": [],
"source": [
"!gsutil cp gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.12-0.22.2.jar $BUILD_PATH\n",
"!wget -P $BUILD_PATH https://repo.anaconda.com/miniconda/Miniconda3-py39_4.10.3-Linux-x86_64.sh"
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "ZXzI2xInqb3V"
},
"source": [
"#### Build the Dataproc Serverless custom runtime using Cloud Build\n",
"\n",
"**Note:** this step may take approximately upto 20 minutes to complete."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"id": "RnpRqGrFkPaH"
},
"outputs": [],
"source": [
"!gcloud builds submit --tag $RUNTIME_CONTAINER_IMAGE $BUILD_PATH --machine-type=N1_HIGHCPU_32 --timeout=3600s --verbosity=info"
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "YhMe6FXw1Ku9"
},
"source": [
"### Build custom components for pipeline arguments\n",
"\n",
"In order to pass job arguments, you create some custom components for each step of the pipeline.\n",
"\n",
"#### Create component for passing args to preprocessing component\n",
"\n",
"The following component passes the args `--train-data-path` and `--out-process-path` in the required format for the preprocessing function defined earlier."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"id": "DZDfkJKJPehg"
},
"outputs": [],
"source": [
"@component(base_image=\"python:3.8-slim\")\n",
"def build_preprocessing_args(train_data_path: str, processed_data_path: str) -> list:\n",
" return [\n",
" \"--train-data-path\",\n",
" train_data_path,\n",
" \"--out-process-path\",\n",
" processed_data_path,\n",
" ]"
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "jAK7Coju1XOp"
},
"source": [
"#### Create component for passing args to training component\n",
"\n",
"The following component passes the args `--train-path`, `--model-path` and `--metrics-path` in the required format for the model training function defined earlier."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"id": "y6wxBk12QvXT"
},
"outputs": [],
"source": [
"@component(base_image=\"python:3.8-slim\")\n",
"def build_training_args(train_path: str, model_path: str, metrics_path: str) -> list:\n",
" return [\n",
" \"--train-path\",\n",
" train_path,\n",
" \"--model-path\",\n",
" model_path,\n",
" \"--metrics-path\",\n",
" metrics_path,\n",
" ]"
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "5VesQlnEm4qR"
},
"source": [
"#### Create model evaluation custom component\n",
"\n",
"Define the component for processing the metrics for model evaluation. The `metrics_uri`, `metrics` and `plots` obtained as outputs from the model training component are further evaluated through this component."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"id": "sU2ScYvnpnkE"
},
"outputs": [],
"source": [
"@component(\n",
" base_image=\"python:3.8\",\n",
" packages_to_install=[\"numpy==1.21.2\", \"pandas==1.3.3\", \"scikit-learn==0.24.2\"],\n",
")\n",
"def evaluate_model(\n",
" metrics_uri: str,\n",
" metrics: Output[Metrics],\n",
" plots: Output[ClassificationMetrics],\n",
") -> NamedTuple(\"Outputs\", [(\"threshold_metric\", float)]):\n",
" # Libraries --------------------------------------------------------------------------------------------------------------------------\n",
" import json\n",
"\n",
" import numpy as np\n",
" from sklearn.metrics import confusion_matrix, roc_curve\n",
"\n",
" # Variables --------------------------------------------------------------------------------------------------------------------------\n",
" metrics_path = metrics_uri.replace(\"gs://\", \"/gcs/\")\n",
" labels = [\"not eligible\", \"eligible\"]\n",
"\n",
" # Helpers --------------------------------------------------------------------------------------------------------------------------\n",
" def calculate_roc(metrics, true, score):\n",
" y_true_np = np.array(metrics[true])\n",
" y_score_np = np.array(metrics[score])\n",
" fpr, tpr, thresholds = roc_curve(\n",
" y_true=y_true_np, y_score=y_score_np, pos_label=True\n",
" )\n",
" return fpr, tpr, thresholds\n",
"\n",
" def calculate_confusion_matrix(metrics, true, prediction):\n",
" y_true_np = np.array(metrics[true])\n",
" y_pred_np = np.array(metrics[prediction])\n",
" c_matrix = confusion_matrix(y_true_np, y_pred_np)\n",
" return c_matrix\n",
"\n",
" # Main -------------------------------------------------------------------------------------------------------------------------------\n",
" with open(metrics_path) as json_file:\n",
" metrics_dict = json.load(json_file)\n",
"\n",
" area_roc = metrics_dict[\"test_area_roc\"]\n",
" area_prc = metrics_dict[\"test_area_prc\"]\n",
" acc = metrics_dict[\"test_accuracy\"]\n",
" f1 = metrics_dict[\"test_f1\"]\n",
" prec = metrics_dict[\"test_precision\"]\n",
" rec = metrics_dict[\"test_recall\"]\n",
"\n",
" metrics.log_metric(\"Test_areaUnderROC\", area_roc)\n",
" metrics.log_metric(\"Test_areaUnderPRC\", area_prc)\n",
" metrics.log_metric(\"Test_Accuracy\", acc)\n",
" metrics.log_metric(\"Test_f1-score\", f1)\n",
" metrics.log_metric(\"Test_Precision\", prec)\n",
" metrics.log_metric(\"Test_Recall\", rec)\n",
"\n",
" fpr, tpr, thresholds = calculate_roc(metrics_dict, \"true\", \"score\")\n",
" c_matrix = calculate_confusion_matrix(metrics_dict, \"true\", \"prediction\")\n",
" plots.log_roc_curve(fpr.tolist(), tpr.tolist(), thresholds.tolist())\n",
" plots.log_confusion_matrix(labels, c_matrix.tolist())\n",
"\n",
" component_outputs = NamedTuple(\n",
" \"Outputs\",\n",
" [\n",
" (\"threshold_metric\", float),\n",
" ],\n",
" )\n",
"\n",
" return component_outputs(area_prc)"
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "28f3d22dd97f"
},
"source": [
"#### Create component for passing args to hyperparameter tuning component\n",
"\n",
"The following component passes the args `--train-path`, `--model-path` and `--metrics-path`, and `--bundle-path` in the required format for the hyperparamter tuning function defined earlier."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"id": "X4ZFmAe7D0M0"
},
"outputs": [],
"source": [
"@component(base_image=\"python:3.8-slim\")\n",
"def build_hpt_args(\n",
" train_path: str,\n",
" model_path: str,\n",
" metrics_path: str,\n",
" bundle_path: str,\n",
") -> list:\n",
" return [\n",
" \"--train-path\",\n",
" train_path,\n",
" \"--model-path\",\n",
" model_path,\n",
" \"--metrics-path\",\n",
" metrics_path,\n",
" \"--bundle-path\",\n",
" bundle_path,\n",
" ]"
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "0db73fff95b3"
},
"source": [
"### (Optional) Serve your model using Vertex AI\n",
"\n",
"The hyperparameter tuning task exports the best performing model as an MLeap bundle. The MLeap bundle can be imported into the Vertex AI Model Registry and used for prediction serving. See [Serving Spark ML model using Vertex AI](https://cloud.google.com/architecture/spark-ml-model-with-vertexai) for more information.\n",
"\n",
"Enable import of the MLeap bundle into the Vertex AI Model Registry and online prediction serving."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"id": "2d7e7c8fc21b"
},
"outputs": [],
"source": [
"# Set DEPLOY_MODEL to True\n",
"DEPLOY_MODEL = True"
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "0bb792622d9f"
},
"source": [
"### Build the model serving container image\n",
"\n",
"A *serving container image* is required to import your model into the Model Registry. The serving container image provides the model serving implementation for the model. Learn more about [serving Spark ML models using Vertex AI](https://cloud.google.com/architecture/spark-ml-model-with-vertexai).\n",
"\n",
"**Note:** this step may take approximately 5 to 10 minutes to complete."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"id": "4703a0f969a3"
},
"outputs": [],
"source": [
"DEPLOY_MODEL_CONDITION = 'deploy'\n",
"\n",
"if DEPLOY_MODEL:\n",
"\n",
" import os\n",
" \n",
" CWD = os.getcwd()\n",
"\n",
" # Clone and build the scala-sbt cloud builder\n",
" ! git clone https://github.com/GoogleCloudPlatform/cloud-builders-community.git\n",
" ! cd {CWD}/cloud-builders-community/scala-sbt && \\\n",
" gcloud builds submit .\n",
"\n",
" # Clone and build the serving container code\n",
" ! cd {CWD} && git clone https://github.com/GoogleCloudPlatform/vertex-ai-spark-ml-serving.git\n",
" ! cd {CWD}/vertex-ai-spark-ml-serving && \\\n",
" gcloud builds submit --config=cloudbuild.yaml \\\n",
" --substitutions=\"_LOCATION={REGION},_REPOSITORY={REPO_NAME},_IMAGE=spark-ml-serving\" ."
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "947b51adc087"
},
"source": [
"### Create component for importing a model artifact into a pipeline\n",
"\n",
"The pipeline uses the `ModelImportOp` component to import (upload) a model to Vertex AI Model Registry.\n",
"\n",
"The `import_model_artifact` python component creates a model artifact that can be passed to the `ModelImportOp` component."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"id": "2ed96e7ad046"
},
"outputs": [],
"source": [
"@dsl.component(\n",
" base_image=\"python:3.8-slim\",\n",
" packages_to_install=[\"google-cloud-aiplatform\"],\n",
")\n",
"def import_model_artifact(\n",
" model: dsl.Output[dsl.Artifact], artifact_uri: str, serving_image_uri: str\n",
"):\n",
" model.metadata[\"containerSpec\"] = {\n",
" \"imageUri\": serving_image_uri,\n",
" \"healthRoute\": \"/health\",\n",
" \"predictRoute\": \"/predict\",\n",
" }\n",
" model.uri = artifact_uri"
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "0d83d9e80923"
},
"source": [
"### Define the schema for model serving\n",
"\n",
"The serving container requires the model schema in JSON format, which is read during container startup. Learn more about [providing the model schema](https://cloud.google.com/architecture/spark-ml-model-with-vertexai#provide_the_model_schema).\n",
"\n",
"Write the model schema file:"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"id": "521d2f4d7992"
},
"outputs": [],
"source": [
"%%writefile $SRC/schema.json\n",
"{\n",
" \"input\": [\n",
" {\n",
" \"name\": \"loan_amount\",\n",
" \"type\": \"DOUBLE\"\n",
" },\n",
" {\n",
" \"name\": \"loan_term\",\n",
" \"type\": \"STRING\"\n",
" },\n",
" {\n",
" \"name\": \"property_area\",\n",
" \"type\": \"STRING\"\n",
" },\n",
" {\n",
" \"name\": \"feature_7\",\n",
" \"type\": \"DOUBLE\"\n",
" },\n",
" {\n",
" \"name\": \"feature_3\",\n",
" \"type\": \"DOUBLE\"\n",
" },\n",
" {\n",
" \"name\": \"feature_1\",\n",
" \"type\": \"DOUBLE\"\n",
" },\n",
" {\n",
" \"name\": \"feature_9\",\n",
" \"type\": \"DOUBLE\"\n",
" },\n",
" {\n",
" \"name\": \"feature_5\",\n",
" \"type\": \"DOUBLE\"\n",
" },\n",
" {\n",
" \"name\": \"feature_0\",\n",
" \"type\": \"DOUBLE\"\n",
" },\n",
" {\n",
" \"name\": \"feature_8\",\n",
" \"type\": \"DOUBLE\"\n",
" },\n",
" {\n",
" \"name\": \"feature_4\",\n",
" \"type\": \"DOUBLE\"\n",
" },\n",
" {\n",
" \"name\": \"feature_2\",\n",
" \"type\": \"DOUBLE\"\n",
" },\n",
" {\n",
" \"name\": \"feature_6\",\n",
" \"type\": \"DOUBLE\"\n",
" }\n",
" ],\n",
" \"output\": [\n",
" {\n",
" \"name\": \"prediction\",\n",
" \"type\": \"DOUBLE\"\n",
" }\n",
" ]\n",
"}"
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "d0b88e26570a"
},
"source": [
"### Copy the model schema configuration file to GCS.\n",
"\n",
"The serving container reads the model schema file location from the `AIP_STORAGE_URI` environment at startup. See [Import the model into Vertex AI](https://cloud.google.com/architecture/spark-ml-model-with-vertexai#import-the-model-into-vertex-ai) for more information."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"id": "b7763bb558f3"
},
"outputs": [],
"source": [
"! gsutil cp $SRC/schema.json $ARTIFACT_URI/schema.json"
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "wQd9M1_9bif7"
},
"source": [
"### Define your workflow as a Vertex AI Pipeline\n",
"\n",
"Use the Kubeflow Pipelines SDK to define your workflow as a machine learning pipeline. The pipeline uses the custom components defined earlier, in addition to components from the `google-cloud-pipeline-components` package."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"id": "yX2u9hXDWtpp"
},
"outputs": [],
"source": [
"@dsl.pipeline(name=PIPELINE_NAME, description=\"A pipeline to train a PySpark model.\")\n",
"def pipeline(\n",
" preprocessing_main_python_file_uri: str = PREPROCESSING_PYTHON_FILE_URI,\n",
" train_data_path: str = FEATURES_TRAIN_URI,\n",
" preprocessed_data_path: str = PROCESSED_DATA_URI,\n",
" training_main_python_file_uri: str = TRAINING_PYTHON_FILE_URI,\n",
" train_path: str = PROCESSED_DATA_URI,\n",
" model_path: str = MODEL_URI,\n",
" metrics_path: str = METRICS_URI,\n",
" threshold: float = AUPR_THRESHOLD,\n",
" hpt_main_python_file_uri: str = HPT_PYTHON_FILE_URI,\n",
" hpt_model_path: str = HPT_MODEL_URI,\n",
" hpt_metrics_path: str = HPT_METRICS_URI,\n",
" hpt_bundle_path: str = HPT_BUNDLE_URI,\n",
" custom_container_image: str = RUNTIME_CONTAINER_IMAGE,\n",
" model_name: str = MODEL_NAME,\n",
" project_id: str = PROJECT_ID,\n",
" location: str = REGION,\n",
" subnetwork_uri: str = SUBNETWORK_URI,\n",
" deploy_model: bool = DEPLOY_MODEL,\n",
" artifact_uri: str = ARTIFACT_URI,\n",
" serving_image_uri: str = SERVING_IMAGE_URI,\n",
" dataproc_runtime_version: str = DATAPROC_RUNTIME_VERSION,\n",
"):\n",
" from google_cloud_pipeline_components.v1.dataproc import \\\n",
" DataprocPySparkBatchOp\n",
" from google_cloud_pipeline_components.v1.endpoint import (EndpointCreateOp,\n",
" ModelDeployOp)\n",
" from google_cloud_pipeline_components.v1.model import ModelUploadOp\n",
"\n",
" # build preprocessed data args\n",
" build_preprocessing_args_op = build_preprocessing_args(\n",
" train_data_path=train_data_path, processed_data_path=preprocessed_data_path\n",
" )\n",
"\n",
" # preprocess data\n",
" data_preprocessing_op = DataprocPySparkBatchOp(\n",
" project=project_id,\n",
" location=location,\n",
" container_image=custom_container_image,\n",
" main_python_file_uri=preprocessing_main_python_file_uri,\n",
" args=build_preprocessing_args_op.output,\n",
" subnetwork_uri=subnetwork_uri,\n",
" runtime_config_version=dataproc_runtime_version,\n",
" ).after(build_preprocessing_args_op)\n",
"\n",
" # build training data args\n",
" build_training_args_op = build_training_args(\n",
" train_path=train_path,\n",
" model_path=model_path,\n",
" metrics_path=metrics_path,\n",
" ).after(data_preprocessing_op)\n",
"\n",
" # training model\n",
" model_training_op = DataprocPySparkBatchOp(\n",
" project=project_id,\n",
" location=location,\n",
" container_image=custom_container_image,\n",
" main_python_file_uri=training_main_python_file_uri,\n",
" args=build_training_args_op.output,\n",
" subnetwork_uri=subnetwork_uri,\n",
" runtime_config_version=dataproc_runtime_version,\n",
" ).after(build_training_args_op)\n",
"\n",
" evaluate_model_op = evaluate_model(metrics_uri=metrics_path).after(\n",
" model_training_op\n",
" )\n",
"\n",
" # evaluate condition\n",
" with Condition(\n",
" evaluate_model_op.outputs[\"threshold_metric\"] >= threshold,\n",
" name=AUPR_HYPERTUNE_CONDITION,\n",
" ):\n",
" build_hpt_args_op = build_hpt_args(\n",
" train_path=train_path,\n",
" model_path=hpt_model_path,\n",
" metrics_path=hpt_metrics_path,\n",
" bundle_path=hpt_bundle_path,\n",
" ).after(evaluate_model_op)\n",
"\n",
" # hyperparameter tuning\n",
" hyperparameter_tuning_op = DataprocPySparkBatchOp(\n",
" project=project_id,\n",
" location=location,\n",
" container_image=custom_container_image,\n",
" main_python_file_uri=hpt_main_python_file_uri,\n",
" args=build_hpt_args_op.output,\n",
" runtime_config_properties=HPT_RUNTIME_PROPERTIES,\n",
" subnetwork_uri=subnetwork_uri,\n",
" # TODO: change to Dataproc Serverless Runtime 1.1.x image when MLeap supports Spark 3.3\n",
" runtime_config_version=\"1.0.29\",\n",
" ).after(model_training_op)\n",
"\n",
" # evaluate condition to upload and deploy model to Vertex AI\n",
" with Condition(\n",
" # kfp casts `bool` parameter to `str`\n",
" deploy_model == \"True\",\n",
" name=DEPLOY_MODEL_CONDITION,\n",
" ):\n",
" # import the model into the pipeline as a kfp model artifact\n",
" import_model_artifact_op = import_model_artifact(\n",
" artifact_uri=artifact_uri,\n",
" serving_image_uri=serving_image_uri,\n",
" )\n",
"\n",
" # upload model to Vertex AI\n",
" model_upload_op = ModelUploadOp(\n",
" project=project_id,\n",
" location=location,\n",
" display_name=model_name,\n",
" unmanaged_container_model=import_model_artifact_op.outputs[\"model\"],\n",
" ).after(hyperparameter_tuning_op)\n",
"\n",
" # create a serving endpoint\n",
" endpoint_op = EndpointCreateOp(\n",
" project=project_id,\n",
" location=location,\n",
" display_name=model_name,\n",
" ).after(model_upload_op)\n",
"\n",
" # deploy model to the serving endpoint\n",
" _ = ModelDeployOp(\n",
" model=model_upload_op.outputs[\"model\"],\n",
" endpoint=endpoint_op.outputs[\"endpoint\"],\n",
" dedicated_resources_machine_type=\"n1-standard-2\",\n",
" dedicated_resources_min_replica_count=1,\n",
" dedicated_resources_max_replica_count=1,\n",
" ).after(endpoint_op)"
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "s4ePytY8t3bu"
},
"source": [
"### Compile your pipeline into a JSON file\n",
"\n",
"Now that you define the workflow of your pipeline, you compile the pipeline into a JSON format."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"id": "uFyCaNPIWtsU"
},
"outputs": [],
"source": [
"compiler.Compiler().compile(pipeline_func=pipeline, package_path=PIPELINE_PACKAGE_PATH)"
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "Zq26zYhQb0qm"
},
"source": [
"### Submit your pipeline run\n",
"\n",
"Next, you use the Vertex AI Python SDK to submit and run your pipeline through Vertex AI Pipelines.\n",
"\n",
"The parameters, artifacts, and metrics produced from the pipeline run are automatically captured into Vertex AI Experiments as an experiment run."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"id": "hwzYIoEabwRx"
},
"outputs": [],
"source": [
"pipeline = vertex_ai.PipelineJob(\n",
" display_name=PIPELINE_NAME,\n",
" template_path=PIPELINE_PACKAGE_PATH,\n",
" pipeline_root=PIPELINE_ROOT,\n",
" enable_caching=False,\n",
")\n",
"\n",
"pipeline.submit(service_account=SERVICE_ACCOUNT, experiment=EXPERIMENT_NAME)"
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "2e428aab1826"
},
"source": [
"### Check the status of your pipeline run\n",
"\n",
"Finally, you can check the status of your pipeline through the link provided in the output of the earlier cell. Alternately, you can use `wait()` method from the below cell to wait till the pipeline executes completely and check the status of the pipeline execution."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"id": "b05ed7a9cf3d"
},
"outputs": [],
"source": [
"pipeline.wait()"
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "0574e2941aaf"
},
"source": [
"### (Optional) View experiment runs\n",
"\n",
"You can retrieve the parameters, artifacts, and metrics for all experiment runs as a pandas DataFrame. See [Compare and analyze runs](https://cloud.google.com/vertex-ai/docs/experiments/compare-analyze-runs) for more information on the topic."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"id": "4f5ab528a98e"
},
"outputs": [],
"source": [
"# get the experiment by name\n",
"experiment = vertex_ai.Experiment(experiment_name=EXPERIMENT_NAME)\n",
"\n",
"# export the data as a dataframe\n",
"experiment_df = experiment.get_data_frame()\n",
"\n",
"# Show successfully completed experiment runs, sorted by F1 score\n",
"experiment_df.query('state == \"COMPLETE\"').sort_values(\n",
" \"metric.Test_f1-score\", ascending=False\n",
")"
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "b584afa5a1b1"
},
"source": [
"### (Optional) Get online predictions from the deployed model\n",
"\n",
"You can request online predictions if the model was deployed to a Vertex AI endpoint. Use the `google-cloud-aiplatform` client library to request predictions, or you can use `curl`.\n",
"\n",
"For this model, the prediction response contains the predicted label (`0 == not eligible`, `1 == eligible`) for each prediction instance that is sent to the endpoint.\n",
"\n",
"#### Use `google-cloud-aiplatform` to request online predictions\n",
"\n",
"The following cell demonstrates how to use the `google-cloud-aiplatform` client library to request predictions from one or more instances."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"id": "f6623204ee52"
},
"outputs": [],
"source": [
"instances = [\n",
" [214.0, \"360\", \"Rural\", 2.13, 2.21, 0.0, 0.0, 2.31, 2.01, 0.0, 0.0, 0.0, 0.0],\n",
" [213.0, \"360\", \"Semiurban\", 2.03, 2.11, 0.0, 0.0, 2.13, 2.02, 0.0, 0.0, 0.0, 0.0],\n",
"]\n",
"\n",
"endpoint = vertex_ai.Endpoint.list(filter=f'display_name=\"{MODEL_NAME}\"')[-1]\n",
"endpoint.predict(instances)"
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "1a2104c45e21"
},
"source": [
"#### Use `curl` to request online predictions\n",
"\n",
"To use `curl`, first write the prediction instances to a file:"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"id": "12d068e1877c"
},
"outputs": [],
"source": [
"%%writefile instances.json\n",
"{\n",
" \"instances\": [\n",
" [214.0, \"360\", \"Rural\", 2.13, 2.21, 0.0, 0.0, 2.31, 2.01, 0.0, 0.0, 0.0, 0.0],\n",
" [213.0, \"360\", \"Semiurban\", 2.03, 2.11, 0.0, 0.0, 2.13, 2.02, 0.0, 0.0, 0.0, 0.0]\n",
" ]\n",
"}"
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "b7cbfec4537d"
},
"source": [
"Use `curl` to send the prediction request to the Vertex AI endpoint:"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"id": "b1617d6e8a3d"
},
"outputs": [],
"source": [
"! curl -X POST \\\n",
" -H \"Authorization: Bearer $(gcloud auth print-access-token)\" \\\n",
" -H \"Content-Type: application/json\" \\\n",
" https://{REGION}-aiplatform.googleapis.com/v1/projects/{PROJECT_ID}/locations/{REGION}/endpoints/{endpoint.name}:predict \\\n",
" -d \"@instances.json\""
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "1169ce7bd4c8"
},
"source": [
"## Cleaning up\n",
"\n",
"To clean up all Google Cloud resources used in this project, you can [delete the Google Cloud\n",
"project](https://cloud.google.com/resource-manager/docs/creating-managing-projects#shutting_down_projects) you used for the tutorial.\n",
"\n",
"Otherwise, you can delete the individual resources you created in this tutorial:\n",
"\n",
"- Vertex AI Pipeline\n",
"- Vertex AI Endpoint\n",
"- Vertex AI Model\n",
"- Vertex AI Experiment\n",
"- Artifact Repository\n",
"- Cloud Storage bucket\n",
"- Local src, build and cloned repo folders"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"id": "UovhFyLeelQe"
},
"outputs": [],
"source": [
"# Delete pipeline\n",
"pipeline.delete()\n",
"\n",
"# Delete endpoints\n",
"endpoint_list = vertex_ai.Endpoint.list(filter=f'display_name=\"{MODEL_NAME}\"')\n",
"for endpoint in endpoint_list:\n",
" endpoint.undeploy_all()\n",
" endpoint.delete()\n",
"\n",
"# Delete model\n",
"model_list = vertex_ai.Model.list(filter=f'display_name=\"{MODEL_NAME}\"')\n",
"for model in model_list:\n",
" model.delete()\n",
"\n",
"# Delete experiment\n",
"experiment.delete()"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"id": "e4a76167aee4"
},
"outputs": [],
"source": [
"# Delete the Artifact repository\n",
"! gcloud artifacts repositories delete $REPO_NAME --location=$REGION --quiet"
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "233d087dda59"
},
"source": [
"Set `delete_bucket` to **True** to delete the Cloud Storage bucket used in this notebook."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"id": "sx_vKniMq9ZX"
},
"outputs": [],
"source": [
"# Delete the Cloud Storage bucket\n",
"delete_bucket = False\n",
"if delete_bucket or os.getenv(\"IS_TESTING\"):\n",
" ! gsutil -m rm -r $BUCKET_URI"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"id": "bc7f1247f0ec"
},
"outputs": [],
"source": [
"# remove the local src, build and repo folders\n",
"!rm -rf $SRC $BUILD_PATH cloud-builders-community vertex-ai-spark-ml-serving"
]
}
],
"metadata": {
"colab": {
"collapsed_sections": [],
"name": "google_cloud_pipeline_components_dataproc_tabular.ipynb",
"toc_visible": true
},
"kernelspec": {
"display_name": "Python 3",
"name": "python3"
}
},
"nbformat": 4,
"nbformat_minor": 0
}