In [None]:
# Copyright 2022 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

# Vertex AI Pipelines: Loan eligibility prediction using `google-cloud-pipeline-components` and Spark ML

<table align="left">

  <td>
    <a href="https://colab.research.google.com/github/GoogleCloudPlatform/vertex-ai-samples/blob/main/notebooks/official/pipelines/google_cloud_pipeline_components_dataproc_tabular.ipynb">
      <img src="https://cloud.google.com/ml-engine/images/colab-logo-32px.png" alt="Colab logo"> Run in Colab
    </a>
  </td>
  <td>
    <a href="https://github.com/GoogleCloudPlatform/vertex-ai-samples/blob/main/notebooks/official/pipelines/google_cloud_pipeline_components_dataproc_tabular.ipynb">
      <img src="https://cloud.google.com/ml-engine/images/github-logo-32px.png" alt="GitHub logo">
      View on GitHub
    </a>                                                                                            
  </td>
  <td>
<a href="https://console.cloud.google.com/vertex-ai/workbench/deploy-notebook?download_url=https://raw.githubusercontent.com/GoogleCloudPlatform/vertex-ai-samples/main/notebooks/official/pipelines/google_cloud_pipeline_components_dataproc_tabular.ipynb" target='_blank'>
      <img src="https://lh3.googleusercontent.com/UiNooY4LUgW_oTvpsNhPpQzsstV5W8F7rYgxgGBD85cWJoLmrOzhVs_ksK_vgx40SHs7jCqkTkCk=e14-rj-sc0xffffff-h130-w32" alt="Vertex AI logo">
      Open in Vertex AI Workbench
    </a>
  </td>
</table>

**_NOTE_**: This notebook has been tested in the following environment:

* Python version = 3.9

## Overview

This notebook shows how to build a Spark ML pipeline using Spark MLlib and DataprocPySparkBatchOp component to determine the customer eligibility for a loan from a banking company. In particular, the pipeline covers a Spark MLib pipeline, from data preprocessing to hyperparameter tuning of a random forest classifier which predicts the probability of a customer being eligible for a loan. 

Learn more about [Vertex AI Pipelines](https://cloud.google.com/vertex-ai/docs/pipelines/introduction) and [Dataproc components](https://cloud.google.com/vertex-ai/docs/pipelines/dataproc-component).

### Objective

In this notebook, you learn how to build a Vertex AI pipeline and train a random-forest model using Spark ML for loan-eligibility classification problem.  

This tutorial uses the following Google Cloud ML services and resources:

- Vertex AI Datasets
- Vertex AI Pipelines
- Vertex AI Training


The steps performed include:

*   Use the `DataprocPySparkBatchOp` to preprocess data.
*   Create a Vertex AI dataset resource on the training data.
*   Train a random forest model using PySpark.
*   Build a Vertex AI pipeline and run the training job.
*   Use the Spark serving image in order to deploy a Spark model on Vertex AI Endpoint.

### Dataset

The dataset is a preprocessed version of the [loan eligibility dataset](https://datasetsearch.research.google.com/search?src=2&query=Loan%20Eligible%20Dataset&docid=L2cvMTFsajJrM3EzcA%3D%3D).

### Costs 

This tutorial uses billable components of Google Cloud:

* Vertex AI
* Cloud Storage
* Dataproc Serverless

Learn about [Vertex AI
pricing](https://cloud.google.com/vertex-ai/pricing), [Cloud Storage
pricing](https://cloud.google.com/storage/pricing), [Dataproc
pricing](https://cloud.google.com/dataproc/pricing) and use the [Pricing
Calculator](https://cloud.google.com/products/calculator/)
to generate a cost estimate based on your projected usage.

## Installation

Install the packages required for executing this notebook.

In [None]:
import os

# (optional) update gcloud if needed
if os.getenv("IS_TESTING"):
    ! gcloud components update --quiet


! pip3 install --upgrade --quiet google-cloud-aiplatform==1.30.1 \
                                 kfp==1.8.14 \
                                 google-cloud-pipeline-components==1.0.33 --no-warn-conflicts

### Colab only: Uncomment the following cell to restart the kernel.

In [None]:
# Automatically restart kernel after installs so that your environment can access the new packages
# import IPython

# app = IPython.Application.instance()
# app.kernel.do_shutdown(True)

## Before you begin

### Set up your Google Cloud project

**The following steps are required, regardless of your notebook environment.**

1. [Select or create a Google Cloud project](https://console.cloud.google.com/cloud-resource-manager). When you first create an account, you get a $300 free credit towards your compute/storage costs.

2. [Make sure that billing is enabled for your project](https://cloud.google.com/billing/docs/how-to/modify-project).

3. [Enable the Artifact Registry, Cloud Build, Container Registry, Dataproc and Vertex AI APIs](https://console.cloud.google.com/flows/enableapi?apiid=aiplatform.googleapis.com,artifactregistry.googleapis.com,cloudbuild.googleapis.com,containerregistry.googleapis.com,dataproc.googleapis.com,aiplatform.googleapis.com).

4. If you are running this notebook locally, you need to install the [Cloud SDK](https://cloud.google.com/sdk).

#### Set your project ID

**If you don't know your project ID**, try the following:
* Run `gcloud config list`.
* Run `gcloud projects list`.
* See the support page: [Locate the project ID](https://support.google.com/googleapi/answer/7014113)

In [None]:
PROJECT_ID = "[your-project-id]"  # @param {type:"string"}

# Set the project id
! gcloud config set project {PROJECT_ID}

#### Region

You can also change the `REGION` variable used by Vertex AI. Learn more about [Vertex AI regions](https://cloud.google.com/vertex-ai/docs/general/locations).

In [None]:
REGION = "us-central1"  # @param {type: "string"}

#### UUID

If you are in a live tutorial session, you might be using a shared test account or project. To avoid name collisions between users on resources created, you create a uuid for each instance session, and append it onto the name of resources you create in this tutorial.

In [None]:
import random
import string


# Generate a uuid of a specifed length(default=8)
def generate_uuid(length: int = 8) -> str:
    return "".join(random.choices(string.ascii_lowercase + string.digits, k=length))


UUID = generate_uuid()

### Authenticate your Google Cloud account

Depending on your Jupyter environment, you may have to manually authenticate. Follow the relevant instructions below.

**1. Vertex AI Workbench**
* Do nothing as you are already authenticated.

**2. Local JupyterLab instance, uncomment and run:**

In [None]:
# ! gcloud auth login

**3. Colab, uncomment and run:**

In [None]:
# from google.colab import auth
# auth.authenticate_user()

**4. Service account or other**
* See how to grant Cloud Storage permissions to your service account at https://cloud.google.com/storage/docs/gsutil/commands/iam#ch-examples.

### Enable Google Cloud services

Enable the following services in your project if not already done:

* Artifact Registry
* Cloud Build
* Container Registry
* Dataproc
* Vertex AI


In [None]:
! gcloud services enable \
    artifactregistry.googleapis.com \
    cloudbuild.googleapis.com \
    containerregistry.googleapis.com \
    dataproc.googleapis.com \
    aiplatform.googleapis.com

### Create a Cloud Storage bucket

Create a storage bucket to store intermediate artifacts such as datasets.

In [None]:
BUCKET_URI = f"gs://your-bucket-name-{PROJECT_ID}-unique"  # @param {type:"string"}

**Only if your bucket doesn't already exist**: Run the following cell to create your Cloud Storage bucket.

In [None]:
! gsutil mb -l {REGION} -p {PROJECT_ID} {BUCKET_URI}

#### Service Account

You use a service account to create Vertex AI Pipeline jobs. If you do not want to use your project's Compute Engine service account, set `SERVICE_ACCOUNT` to another service account ID.

In [None]:
SERVICE_ACCOUNT = "[your-service-account]"  # @param {type:"string"}

In [None]:
import os
import sys

IS_COLAB = "google.colab" in sys.modules

if (
    SERVICE_ACCOUNT == ""
    or SERVICE_ACCOUNT is None
    or SERVICE_ACCOUNT == "[your-service-account]"
):
    # Get your service account from gcloud
    if not IS_COLAB:
        shell_output = !gcloud auth list 2>/dev/null
        SERVICE_ACCOUNT = shell_output[2].replace("*", "").strip()

    else:  # IS_COLAB:
        shell_output = ! gcloud projects describe  $PROJECT_ID
        project_number = shell_output[-1].split(":")[1].strip().replace("'", "")
        SERVICE_ACCOUNT = f"{project_number}-compute@developer.gserviceaccount.com"

    print("Service Account:", SERVICE_ACCOUNT)

#### Set service account access for Vertex AI Pipelines

Run the following commands to grant your service account access to read and write pipeline artifacts in the bucket that you created in the previous step. You only need to run this step once per service account.

In [None]:
! gsutil iam ch serviceAccount:{SERVICE_ACCOUNT}:roles/storage.objectCreator $BUCKET_URI

! gsutil iam ch serviceAccount:{SERVICE_ACCOUNT}:roles/storage.objectViewer $BUCKET_URI

### Load preprocessing data

The notebook uses a preprocessed set of data you read from the Vertex AI Feature Store. 

In [None]:
PUBLIC_DATA_URI = "gs://cloud-samples-data/vertex-ai/dataset-management/datasets/loan_eligibilty/data.csv"
FEATURES_TRAIN_URI = f"{BUCKET_URI}/data/features/snapshots/{UUID}"

!gsutil cp -r $PUBLIC_DATA_URI $FEATURES_TRAIN_URI

### Enabling Private Google Access for Dataproc Serverless

In [None]:
SUBNETWORK = "default"  # @param {type:"string"}

!gcloud compute networks subnets list --regions=$REGION --filter=$SUBNETWORK

!gcloud compute networks subnets update $SUBNETWORK \
--region=$REGION \
--enable-private-ip-google-access

!gcloud compute networks subnets describe $SUBNETWORK \
--region=$REGION \
--format="get(privateIpGoogleAccess)"

### Create the Docker repository

You create a Docker repository in the Artifact Registry for the custom dataproc image that you are going to create.

In [None]:
# set repo name
REPO_NAME = "loan-eligibility-spark-demo"

# create the repository
!gcloud artifacts repositories create $REPO_NAME \
    --repository-format=docker \
    --location=$REGION \
    --quiet \
    --description="loan eligibility spark docker repository"

### Import libraries and define constants

In [1]:
# General
from pathlib import Path as path
from typing import NamedTuple

from google.cloud import aiplatform as vertex_ai
from kfp.v2 import compiler, dsl
from kfp.v2.dsl import (ClassificationMetrics, Condition, Metrics, Output,
                        component)

In [None]:
# Setup
DATAPROC_RUNTIME_VERSION = "1.1.20"
SRC = path("src")
BUILD_PATH = path("build")
DELIVERABLES = path("deliverables")
DATA_PATH = path("data")
RUNTIME_IMAGE = "dataproc_serverless_custom_runtime"
IMAGE_TAG = "1.0.0"

# Pipeline
PIPELINE_NAME = "pyspark-loan-eligibility-pipeline"
PIPELINE_ROOT = f"{BUCKET_URI}/pipelines"
PIPELINE_PACKAGE_PATH = str(BUILD_PATH / f"pipeline_{UUID}.json")
RUNTIME_CONTAINER_IMAGE = f"gcr.io/{PROJECT_ID}/{RUNTIME_IMAGE}:{IMAGE_TAG}"
SUBNETWORK_URI = f"projects/{PROJECT_ID}/regions/{REGION}/subnetworks/{SUBNETWORK}"
ML_APPLICATION = "loan-eligibility"
TASK = "sparkml"
MODEL_TYPE = "rfor"
VERSION = "1.0.0"
MODEL_NAME = f"{ML_APPLICATION}-{TASK}-{MODEL_TYPE}-{VERSION}"
ARTIFACT_URI = f"{BUCKET_URI}/deliverables/bundle/{UUID}"

# Preprocessing
PREPROCESSING_PYTHON_FILE_URI = f"{BUCKET_URI}/src/data_preprocessing.py"
PROCESSED_DATA_URI = f"{BUCKET_URI}/data/processed"
PREPROCESSING_ARGS = [
    "--train-data-path",
    FEATURES_TRAIN_URI,
    "--out-process-path",
    PROCESSED_DATA_URI,
]

# Training
TRAINING_PYTHON_FILE_URI = f"{BUCKET_URI}/src/model_training.py"
MODEL_URI = f"{BUCKET_URI}/deliverables/model/rfor/{UUID}/train_model"
METRICS_URI = f"{BUCKET_URI}/deliverables/metrics/rfor/{UUID}/train_metrics.json"
TRAINING_ARGS = [
    "--train-path",
    PROCESSED_DATA_URI,
    "--model-path",
    MODEL_URI,
    "--metrics-path",
    METRICS_URI,
]

# Condition
AUPR_THRESHOLD = 0.5
AUPR_HYPERTUNE_CONDITION = "hypertune"

# Hypertuning
HPT_PYTHON_FILE_URI = f"{BUCKET_URI}/src/hp_tuning.py"
HPT_MODEL_URI = f"{BUCKET_URI}/deliverables/model/rfor/{UUID}/model"
HPT_METRICS_URI = f"{BUCKET_URI}/deliverables/metrics/rfor/{UUID}/metrics.json"
HPT_ARGS = [
    "--train-path",
    PROCESSED_DATA_URI,
    "--model-path",
    HPT_MODEL_URI,
    "--metrics-path",
    HPT_METRICS_URI,
]
HPT_BUNDLE_URI = f"{ARTIFACT_URI}/model.zip"
HPT_ARGS = [
    "--train-path",
    PROCESSED_DATA_URI,
    "--model-path",
    HPT_MODEL_URI,
    "--metrics-path",
    HPT_METRICS_URI,
    "--bundle-path",
    HPT_BUNDLE_URI,
]
HPT_RUNTIME_PROPERTIES = {
    "spark.jars.packages": "ml.combust.mleap:mleap-spark-base_2.12:0.21.1,ml.combust.mleap:mleap-spark_2.12:0.21.1"
}

# Experiment
EXPERIMENT_NAME = "loan-eligibility"

# Deploy
SERVING_IMAGE_URI = f"{REGION}-docker.pkg.dev/{PROJECT_ID}/{REPO_NAME}/spark-ml-serving"

### Initialize the Vertex AI SDK client

In [None]:
vertex_ai.init(
    project=PROJECT_ID,
    location=REGION,
    staging_bucket=BUCKET_URI,
    experiment=EXPERIMENT_NAME,
)

## Build the Vertex Pipeline to train and deploy a Spark model

In this case, the ML pipeline includes the following steps:

1.   Impute categorical and numerical variables with `DataprocPySparkBatchOp`
2.   Train an `RandomForestClassifier` with `DataprocPySparkBatchOp`
3.   Run a custom component in order to evaluate the model

If the model respects the performance condition, then:

4.   Hypertune the `RandomForestClassifier` with `DataprocPySparkBatchOp`
5.   Serializes the model to MLeap format to use the model outside of Spark.

If the `deploy_model` pipeline parameter is set to `True`:

6.   Upload the model to Vertex AI Model Registry.
7.   Creates a Vertex AI endpoint.
8.   Deploys the model to the Vertex AI endpoint for serving online prediction requests.


### Define the code for PySpark jobs

Define the code for data-preprocessing, model-training and hyperparameter-tuning.

Initialize a source directory for the code.

In [None]:
# make a source directory to save the code
! mkdir $SRC
! echo "" > $SRC/__init__.py

#### Create the source code for data-preprocessing

Create the `data_preprocessing.py` file that ingests the data, preprocesses it for training and uploads the processed data to the Cloud Storage. Through this code, a Spark session is created with logging enabled. The preprocessing is handled through this session and involves converting the datatypes of the variables `label` and `loan_amount` from string to double. The arguments to this code are defined as follows:

- `--train-data-path`: The GCS path of the training sample.
- `--out-process-path`: The path to save the processed data.

In [None]:
%%writefile $SRC/data_preprocessing.py

#!/usr/bin/env python3
# -*- coding: utf-8 -*-

"""
data_preprocessing.py is the module for

  - ingest data
  - do simple preprocessing tasks
  - upload processed data to gcs
"""

# Libraries --------------------------------------------------------------------------------
import logging
import argparse
from pathlib import Path
import sys

try:
    from pyspark import SparkContext, SparkConf
    from pyspark.sql import SparkSession
except ImportError as error:
    print('WARN: Something wrong with pyspark library. Please check configuration settings!')
    print(error)

from pyspark.sql.types import StructType, DoubleType, StringType

# Variables --------------------------------------------------------------------------------
DATA_SCHEMA = (StructType()
               .add("label", StringType(), True)
               .add("loan_amount", StringType(), True)
               .add("loan_term", StringType(), True)
               .add("property_area", StringType(), True)
               .add("timestamp", StringType(), True)
               .add("entity_type_customer_id", StringType(), True)
               .add("feature_7", DoubleType(), True)
               .add("feature_3", DoubleType(), True)
               .add("feature_1", DoubleType(), True)
               .add("feature_9", DoubleType(), True)
               .add("feature_5", DoubleType(), True)
               .add("feature_0", DoubleType(), True)
               .add("feature_8", DoubleType(), True)
               .add("feature_4", DoubleType(), True)
               .add("feature_2", DoubleType(), True)
               .add("feature_6", DoubleType(), True)
               )

ENTITY_CUSTOMER_ID = 'entity_type_customer_id'
FEATURE_STORE_IDS = ['timestamp', 'entity_type_customer_id']
CATEGORICAL_VARIABLES = ['loan_term', 'property_area']
IDX_CATEGORICAL_FEATURES = [f'{col}_idx' for col in CATEGORICAL_VARIABLES]
TARGET = 'label'


# Helpers ----------------------------------------------------------------------------------

def set_logger():
    """
    Set logger for the module
    Returns:
        logger: logger object
    """
    fmt_pattern = "%(asctime)s — %(name)s — %(levelname)s —" "%(funcName)s:%(lineno)d — %(message)s"
    main_logger = logging.getLogger(__name__)
    main_logger.setLevel(logging.INFO)
    main_logger.propagate = False
    stream_handler = logging.StreamHandler(sys.stdout)
    stream_handler.setLevel(logging.INFO)
    formatter = logging.Formatter(fmt_pattern)
    stream_handler.setFormatter(formatter)
    main_logger.addHandler(stream_handler)
    return main_logger


def get_args():
    """
    Get arguments from command line
    Returns:
        args: arguments from command line
    """
    args_parser = argparse.ArgumentParser()
    args_parser.add_argument(
        '--train-data-path',
        help='The GCS path of training sample',
        type=str,
        required=True)
    args_parser.add_argument(
        '--out-process-path',
        help='''
        The path to load processed data. 
        Format: 
        - locally: /path/to/dir
        - cloud: gs://bucket/path
        ''',
        type=str,
        required=True)
    return args_parser.parse_args()


# Main -------------------------------------------------------------------------------------

def main(logger, args):
    """
    Main function
    Args:
        logger: logger object
        args: arguments from command line
    Returns:
        None
    """
    # variables
    train_data_path = args.train_data_path
    output_data_path = args.out_process_path

    logger.info('initializing data preprocessing.')
    logger.info('start spark session.')

    spark = (SparkSession.builder
             .master("local[*]")
             .appName("loan eligibility")
             .getOrCreate())
    try:
        logger.info(f'spark version: {spark.sparkContext.version}')
        logger.info('start ingesting data.')

        training_data_raw_df = (spark.read.option("header", True)
                                .option("delimiter", ',')
                                .schema(DATA_SCHEMA)
                                .csv(train_data_path)
                                .drop(*FEATURE_STORE_IDS))

        training_data_raw_df = training_data_raw_df.withColumn("label",
                                                               training_data_raw_df.label.cast('double'))
        training_data_raw_df = training_data_raw_df.withColumn("loan_amount",
                                                               training_data_raw_df.loan_amount.cast('double'))
        training_data_raw_df.show(truncate=False)

        logger.info(f'load prepared data to {output_data_path}.')
        training_data_raw_df.write.mode('overwrite').csv(str(output_data_path), header=True)
    except RuntimeError as main_error:
        logger.error(main_error)
    else:
        logger.info('data preprocessing successfully completed!')
        return 0


if __name__ == "__main__":
    runtime_args = get_args()
    runtime_logger = set_logger()
    main(runtime_logger, runtime_args)

#### Create the source code for model-training

Create the `model_training.py` file for training a random-forest classifier model on the training data. The training is performed using Spark ML inside a Spark session. The code fetches the training data from the Cloud storage bucket, processes it and trains the random-forest model. The trained model and the metrics obtained from the trained model (like AUC-ROC, accuracy, precision etc.) are then saved to the provided output Cloud Storage path. This code accepts the following arguments:

- `--train-path`: The Cloud Storage path of the training sample.
- `--model-path`: The Cloud Storage path to store the trained model.
- `--metrics-path`: The Cloud Storage path to store the metrics of model.

In [None]:
%%writefile $SRC/model_training.py

#!/usr/bin/env python3
# -*- coding: utf-8 -*-

"""
model_training.py is the module for training spark pipeline
"""

# Libraries --------------------------------------------------------------------------------
import logging
import sys
import argparse
from pathlib import Path as path
import tempfile
import json
from urllib.parse import urlparse

try:
    from pyspark import SparkContext, SparkConf
    from pyspark.sql import SparkSession
except ImportError as e:
    print('WARN: Something wrong with pyspark library. Please check configuration settings!')
    print(e)

from pyspark.sql.types import StructType, DoubleType, StringType
from pyspark.sql.functions import col, udf
from pyspark.sql.functions import round as spark_round
from pyspark.ml.feature import StringIndexer, StandardScaler, VectorAssembler
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.evaluation import BinaryClassificationEvaluator, MulticlassClassificationEvaluator
from pyspark.ml import Pipeline

from google.cloud import storage

# Variables --------------------------------------------------------------------------------

# Data schema
DATA_SCHEMA = (StructType()
               .add("label", DoubleType(), True)
               .add("loan_amount", DoubleType(), True)
               .add("loan_term", StringType(), True)
               .add("property_area", StringType(), True)
               .add("feature_7", DoubleType(), True)
               .add("feature_3", DoubleType(), True)
               .add("feature_1", DoubleType(), True)
               .add("feature_9", DoubleType(), True)
               .add("feature_5", DoubleType(), True)
               .add("feature_0", DoubleType(), True)
               .add("feature_8", DoubleType(), True)
               .add("feature_4", DoubleType(), True)
               .add("feature_2", DoubleType(), True)
               .add("feature_6", DoubleType(), True)
               )

# Training
TARGET = 'label'
CATEGORICAL_VARIABLES = ['loan_term', 'property_area']
IDX_CATEGORICAL_FEATURES = [f'{col}_idx' for col in CATEGORICAL_VARIABLES]
REAL_TIME_FEATURES_VECTOR = 'real_time_features_vector'
REAL_TIME_FEATURES = 'real_time_features'
FEATURES_SELECTED = ['feature_0', 'feature_1', 'feature_2', 'feature_3', 'feature_4', 'feature_5',
                     'feature_6', 'feature_7', 'feature_8', 'feature_9', 'real_time_features']
FEATURES = 'features'
RANDOM_SEED = 8
RANDOM_QUOTAS = [0.8, 0.2]


# Helpers ----------------------------------------------------------------------------------
def set_logger():
    """
    Set logger
    Returns:
        logger: logger
    """
    fmt_pattern = "%(asctime)s — %(name)s — %(levelname)s —" "%(funcName)s:%(lineno)d — %(message)s"
    main_logger = logging.getLogger(__name__)
    main_logger.setLevel(logging.INFO)
    main_logger.propagate = False
    stream_handler = logging.StreamHandler(sys.stdout)
    stream_handler.setLevel(logging.INFO)
    formatter = logging.Formatter(fmt_pattern)
    stream_handler.setFormatter(formatter)
    main_logger.addHandler(stream_handler)
    return main_logger


def get_args():
    """
    Get arguments
    Returns:
        args: arguments
    """
    args_parser = argparse.ArgumentParser()
    args_parser.add_argument(
        '--train-path',
        help='''
        The GCS path of training data'
        Format: 
        - locally: /path/to/dir
        - cloud: gs://bucket/path
        ''',
        type=str,
        required=True)
    args_parser.add_argument(
        '--model-path',
        help='''
        The GCS path to store the trained model. 
        Format: 
        - locally: /path/to/dir
        - cloud: gs://bucket/path
        ''',
        type=str,
        required=True)
    args_parser.add_argument(
        '--metrics-path',
        help='''
        The GCS path to store the metrics of model. 
        Format: 
        - locally: /path/to/dir
        - cloud: gs://bucket/path
        ''',
        type=str,
        required=True)
    return args_parser.parse_args()


def build_preprocessing_components():
    """
    Build preprocessing components
    Returns:
        data_preprocessing_stages: data preprocessing stages
    """
    loan_term_indexer = StringIndexer(inputCol=CATEGORICAL_VARIABLES[0], outputCol=IDX_CATEGORICAL_FEATURES[0],
                                      stringOrderType='frequencyDesc', handleInvalid='keep')
    property_area_indexer = StringIndexer(inputCol=CATEGORICAL_VARIABLES[1], outputCol=IDX_CATEGORICAL_FEATURES[1],
                                          stringOrderType='frequencyDesc', handleInvalid='keep')
    data_preprocessing_stages = [loan_term_indexer, property_area_indexer]
    return data_preprocessing_stages


def build_feature_engineering_components():
    """
    Build feature engineering components
    Returns:
        feature_engineering_stages: feature engineering stages
    """
    feature_engineering_stages = []
    realtime_vector_assembler = VectorAssembler(inputCols=IDX_CATEGORICAL_FEATURES, outputCol=REAL_TIME_FEATURES_VECTOR)
    realtime_scaler = StandardScaler(inputCol=REAL_TIME_FEATURES_VECTOR, outputCol=REAL_TIME_FEATURES)
    features_vector_assembler = VectorAssembler(inputCols=FEATURES_SELECTED, outputCol=FEATURES)
    feature_engineering_stages.extend((realtime_vector_assembler,
                                       realtime_scaler,
                                       features_vector_assembler))
    return feature_engineering_stages


def build_training_model_component():
    """
    Build training model component
    Returns:
        model_training_stage: model_training_stage
    """
    model_training_stage = []
    rfor = RandomForestClassifier(featuresCol=FEATURES, labelCol=TARGET, seed=RANDOM_SEED)
    model_training_stage.append(rfor)
    return model_training_stage


def build_pipeline(data_preprocessing_stages, feature_engineering_stages, model_training_stage):
    """
    Build pipeline
    Args:
        data_preprocessing_stages:  data preprocessing stages
        feature_engineering_stages: feature engineering stages
        model_training_stage: model_training_stage
    Returns:
        pipeline: pipeline
    """
    pipeline = Pipeline(stages=data_preprocessing_stages + feature_engineering_stages + model_training_stage)
    return pipeline


def get_true_score_prediction(predictions, target):
    """
    Get true score prediction
    Args:
        predictions: predictions
        target: target
    Returns:
        roc_dict: a dict of roc values for each class
    """
    split1_udf = udf(lambda value: value[1].item(), DoubleType())
    roc_dataset = predictions.select(col(target).alias('true'),
                                     spark_round(split1_udf('probability'), 5).alias('score'),
                                     'prediction')
    roc_df = roc_dataset.toPandas()
    roc_dict = roc_df.to_dict(orient='list')
    return roc_dict


def get_metrics(predictions, target, mode):
    """
    Get metrics
    Args:
        predictions: predictions
        target: target column name
        mode: train or test
    Returns:
        metrics: metrics
    """
    metric_labels = ['area_roc', 'area_prc', 'accuracy', 'f1', 'precision', 'recall']
    metric_cols = ['true', 'score', 'prediction']
    metric_keys = [f'{mode}_{ml}' for ml in metric_labels] + metric_cols
    bc_evaluator = BinaryClassificationEvaluator(labelCol=target)
    mc_evaluator = MulticlassClassificationEvaluator(labelCol=target)

    # areas, acc, f1, prec, rec
    metric_values = []
    area_roc = round(bc_evaluator.evaluate(predictions, {bc_evaluator.metricName: 'areaUnderROC'}), 5)
    area_prc = round(bc_evaluator.evaluate(predictions, {bc_evaluator.metricName: 'areaUnderPR'}), 5)
    acc = round(mc_evaluator.evaluate(predictions, {mc_evaluator.metricName: "accuracy"}), 5)
    f1 = round(mc_evaluator.evaluate(predictions, {mc_evaluator.metricName: "f1"}), 5)
    prec = round(mc_evaluator.evaluate(predictions, {mc_evaluator.metricName: "weightedPrecision"}), 5)
    rec = round(mc_evaluator.evaluate(predictions, {mc_evaluator.metricName: "weightedRecall"}), 5)

    # true, score, prediction
    roc_dict = get_true_score_prediction(predictions, target)
    true = roc_dict['true']
    score = roc_dict['score']
    pred = roc_dict['prediction']

    metric_values.extend((area_roc, area_prc, acc, f1, prec, rec, true, score, pred))
    metrics = dict(zip(metric_keys, metric_values))

    return metrics


def upload_file(bucket_name, source_file_name, destination_blob_name):
    """
    Upload file to bucket
    Args:
        bucket_name: bucket name
        source_file_name: source file name
        destination_blob_name: destination blob name
    Returns:
        None
    """
    storage_client = storage.Client()
    bucket = storage_client.bucket(bucket_name)
    blob = bucket.blob(destination_blob_name)
    blob.upload_from_filename(source_file_name)


def write_metrics(bucket_name, metrics, destination, dir='/tmp'):
    """
    Write metrics to file
    Args:
        bucket_name: bucket name
        metrics: metrics
        destination: destination
        dir: directory to write file temporarily
    Returns:
        None
    """
    temp_dir = tempfile.TemporaryDirectory(dir=dir)
    temp_metrics_file_path = str(path(temp_dir.name) / path(destination).name)
    with open(temp_metrics_file_path, 'w') as temp_file:
        json.dump(metrics, temp_file)
    upload_file(bucket_name, temp_metrics_file_path, destination)
    temp_dir.cleanup()


# Main -------------------------------------------------------------------------------------

def main(logger, args):
    """
    Main function
    Args:
        logger: logger
        args: args
    Returns:
        None
    """
    train_path = args.train_path
    model_path = args.model_path
    metrics_path = args.metrics_path

    try:
        logger.info('initializing pipeline training.')
        logger.info('start spark session.')
        spark = (SparkSession.builder
                 .master("local[*]")
                 .appName("loan eligibility")
                 .getOrCreate())
        logger.info(f'spark version: {spark.sparkContext.version}')
        logger.info('start bulding pipeline.')
        preprocessing_stages = build_preprocessing_components()
        feature_engineering_stages = build_feature_engineering_components()
        model_training_stage = build_training_model_component()
        pipeline = build_pipeline(preprocessing_stages, feature_engineering_stages, model_training_stage)

        logger.info(f'load train data from {train_path}.')
        raw_data = (spark.read.format('csv')
                    .option("header", "true")
                    .schema(DATA_SCHEMA)
                    .load(train_path))

        logger.info(f'fit model pipeline.')
        train, test = raw_data.randomSplit(RANDOM_QUOTAS, seed=RANDOM_SEED)
        pipeline_model = pipeline.fit(train)
        predictions = pipeline_model.transform(test)
        metrics = get_metrics(predictions, TARGET, 'test')
        for m, v in metrics.items():
            print(f'{m}: {v}')

        logger.info(f'load model pipeline in {model_path}.')
        pipeline.write().overwrite().save(model_path)

        logger.info(f'Upload metrics under {metrics_path}.')     
        bucket = urlparse(model_path).netloc
        metrics_file_path = urlparse(metrics_path).path.strip('/')
        write_metrics(bucket, metrics, metrics_file_path)
        
    except RuntimeError as main_error:
        logger.error(main_error)
    else:
        logger.info('model pipeline training successfully completed!')
        return 0


if __name__ == "__main__":
    runtime_args = get_args()
    runtime_logger = set_logger()
    main(runtime_logger, runtime_args)

#### Create the source code for hyperparameter-tuning

Create the `hp_tuning.py` file for tuning the hyperparameters of the random-forest classifier model using crossvalidation. This code accepts the following arguments:

- `--train-path`: The GCS path of the training sample.
- `--model-path`: The GCS path to store the trained model.
- `--metrics-path`: The GCS path to store the metrics of model.

The hyperparameter tuning job will also serialize the best performing model to an MLeap bundle, which can be imported to Vertex AI as a model for serving predictions - see the *Serve your model in Vertex AI* section further below.

In [None]:
%%writefile $SRC/hp_tuning.py

#!/usr/bin/env python3
# -*- coding: utf-8 -*-

"""
hp_model_tuning.py is the module for hypertune the spark pipeline
"""

# Libraries --------------------------------------------------------------------------------
import logging
import sys
import argparse
from os import environ
from datetime import datetime
from pathlib import Path as path
import tempfile
from urllib.parse import urlparse, urljoin
import json

try:
    from pyspark import SparkContext, SparkConf
    from pyspark.sql import SparkSession
except ImportError as e:
    print('WARN: Something wrong with pyspark library. Please check configuration settings!')
    print(e)
    
import mleap.pyspark
from mleap.pyspark.spark_support import SimpleSparkSerializer

from pyspark.sql.types import StructType, DoubleType, StringType
from pyspark.sql.functions import col, udf
from pyspark.sql.functions import round as spark_round
from pyspark.ml.feature import StringIndexer, StandardScaler, VectorAssembler
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.evaluation import BinaryClassificationEvaluator, MulticlassClassificationEvaluator
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.ml import Pipeline

from google.cloud import storage

# Variables --------------------------------------------------------------------------------

# Data schema
DATA_SCHEMA = (StructType()
               .add("label", DoubleType(), True)
               .add("loan_amount", DoubleType(), True)
               .add("loan_term", StringType(), True)
               .add("property_area", StringType(), True)
               .add("feature_7", DoubleType(), True)
               .add("feature_3", DoubleType(), True)
               .add("feature_1", DoubleType(), True)
               .add("feature_9", DoubleType(), True)
               .add("feature_5", DoubleType(), True)
               .add("feature_0", DoubleType(), True)
               .add("feature_8", DoubleType(), True)
               .add("feature_4", DoubleType(), True)
               .add("feature_2", DoubleType(), True)
               .add("feature_6", DoubleType(), True)
               )

# Training
TARGET = 'label'
CATEGORICAL_VARIABLES = ['loan_term', 'property_area']
IDX_CATEGORICAL_FEATURES = [f'{col}_idx' for col in CATEGORICAL_VARIABLES]
REAL_TIME_FEATURES_VECTOR = 'real_time_features_vector'
REAL_TIME_FEATURES = 'real_time_features'
FEATURES_SELECTED = ['feature_0', 'feature_1', 'feature_2', 'feature_3', 'feature_4', 'feature_5',
                     'feature_6', 'feature_7', 'feature_8', 'feature_9', 'real_time_features']
FEATURES = 'features'
RANDOM_SEED = 8
RANDOM_QUOTAS = [0.8, 0.2]
MAX_DEPTH = [5, 10, 15]
MAX_BINS = [24, 32, 40]
N_TREES = [25, 30, 35]
N_FOLDS = 5


# Helpers ----------------------------------------------------------------------------------
def set_logger():
    """
    Set logger for the module
    Returns:
        logger: logger object
    """
    fmt_pattern = "%(asctime)s — %(name)s — %(levelname)s —" "%(funcName)s:%(lineno)d — %(message)s"
    main_logger = logging.getLogger(__name__)
    main_logger.setLevel(logging.INFO)
    main_logger.propagate = False
    stream_handler = logging.StreamHandler(sys.stdout)
    stream_handler.setLevel(logging.INFO)
    formatter = logging.Formatter(fmt_pattern)
    stream_handler.setFormatter(formatter)
    main_logger.addHandler(stream_handler)
    return main_logger


def get_args():
    """
    Get arguments from command line
    Returns:
        args: arguments from command line
    """
    args_parser = argparse.ArgumentParser()
    args_parser.add_argument(
        '--train-path',
        help='''
        The GCS path of training data'
        Format: 
        - locally: /path/to/dir
        - cloud: gs://bucket/path
        ''',
        type=str,
        required=False)
    args_parser.add_argument(
        '--model-path',
        help='''
        The GCS path to store the trained model. 
        Format: 
        - locally: /path/to/dir
        - cloud: gs://bucket/path
        ''',
        type=str,
        required=False)
    args_parser.add_argument(
        '--metrics-path',
        help='''
        The GCS path to store the metrics of model. 
        Format: 
        - locally: /path/to/dir
        - cloud: gs://bucket/path
        ''',
        type=str,
        required=True)
    args_parser.add_argument(
        '--bundle-path',
        help='''
        The GCS path to store the exported MLeap bundle. 
        Format: 
        - locally: /path/to/dir
        - cloud: gs://bucket/path
        ''',
        type=str,
        required=True)
    return args_parser.parse_args()


def build_preprocessing_components():
    """
    Build preprocessing components
    Returns:
        preprocessing_components: preprocessing components
    """
    loan_term_indexer = StringIndexer(inputCol=CATEGORICAL_VARIABLES[0], outputCol=IDX_CATEGORICAL_FEATURES[0],
                                      stringOrderType='frequencyDesc', handleInvalid='keep')
    property_area_indexer = StringIndexer(inputCol=CATEGORICAL_VARIABLES[1], outputCol=IDX_CATEGORICAL_FEATURES[1],
                                          stringOrderType='frequencyDesc', handleInvalid='keep')
    data_preprocessing_stages = [loan_term_indexer, property_area_indexer]
    return data_preprocessing_stages


def build_feature_engineering_components():
    """
    Build feature engineering components
    Returns:
        feature_engineering_components: feature engineering components
    """
    feature_engineering_stages = []
    realtime_vector_assembler = VectorAssembler(inputCols=IDX_CATEGORICAL_FEATURES, outputCol=REAL_TIME_FEATURES_VECTOR)
    realtime_scaler = StandardScaler(inputCol=REAL_TIME_FEATURES_VECTOR, outputCol=REAL_TIME_FEATURES)
    features_vector_assembler = VectorAssembler(inputCols=FEATURES_SELECTED, outputCol=FEATURES)
    feature_engineering_stages.extend((realtime_vector_assembler,
                                       realtime_scaler,
                                       features_vector_assembler))
    return feature_engineering_stages


def build_training_model_component():
    """
    Build training model component
    Returns:
        training_model_component: training model component
    """
    model_training_stage = []
    rfor = RandomForestClassifier(featuresCol=FEATURES, labelCol=TARGET, seed=RANDOM_SEED)
    model_training_stage.append(rfor)
    return model_training_stage


def build_hp_pipeline(data_preprocessing_stages, feature_engineering_stages, model_training_stage):
    """
    Build hyperparameter pipeline
    Args:
        data_preprocessing_stages: preprocessing components
        feature_engineering_stages: feature engineering components
        model_training_stage: training model component
    Returns:
        hp_pipeline: hyperparameter pipeline
    """
    pipeline = Pipeline(stages=data_preprocessing_stages + feature_engineering_stages + model_training_stage)
    params_grid = (ParamGridBuilder()
                   .addGrid(model_training_stage[0].maxDepth, MAX_DEPTH)
                   .addGrid(model_training_stage[0].maxBins, MAX_BINS)
                   .addGrid(model_training_stage[0].numTrees, N_TREES)
                   .build())
    evaluator = BinaryClassificationEvaluator(labelCol=TARGET)
    cross_validator = CrossValidator(estimator=pipeline,
                                     estimatorParamMaps=params_grid,
                                     evaluator=evaluator,
                                     numFolds=N_FOLDS)
    return cross_validator


def get_true_score_prediction(predictions, target):
    """
    Get true score and prediction
    Args:
        predictions: predictions
        target: target column
    Returns:
        roc_dict: a dict of roc values for each class
    """

    split1_udf = udf(lambda value: value[1].item(), DoubleType())
    roc_dataset = predictions.select(col(target).alias('true'),
                                     spark_round(split1_udf('probability'), 5).alias('score'),
                                     'prediction')
    roc_df = roc_dataset.toPandas()
    roc_dict = roc_df.to_dict(orient='list')
    return roc_dict


def get_metrics(predictions, target, mode):
    """
    Get metrics
    Args:
        predictions: predictions
        target: target column
        mode: train or test
    Returns:
        metrics: metrics
    """
    metric_labels = ['area_roc', 'area_prc', 'accuracy', 'f1', 'precision', 'recall']
    metric_cols = ['true', 'score', 'prediction']
    metric_keys = [f'{mode}_{ml}' for ml in metric_labels] + metric_cols

    bc_evaluator = BinaryClassificationEvaluator(labelCol=target)
    mc_evaluator = MulticlassClassificationEvaluator(labelCol=target)

    # areas, acc, f1, prec, rec
    metric_values = []
    area_roc = round(bc_evaluator.evaluate(predictions, {bc_evaluator.metricName: 'areaUnderROC'}), 5)
    area_prc = round(bc_evaluator.evaluate(predictions, {bc_evaluator.metricName: 'areaUnderPR'}), 5)
    acc = round(mc_evaluator.evaluate(predictions, {mc_evaluator.metricName: "accuracy"}), 5)
    f1 = round(mc_evaluator.evaluate(predictions, {mc_evaluator.metricName: "f1"}), 5)
    prec = round(mc_evaluator.evaluate(predictions, {mc_evaluator.metricName: "weightedPrecision"}), 5)
    rec = round(mc_evaluator.evaluate(predictions, {mc_evaluator.metricName: "weightedRecall"}), 5)

    # true, score, prediction
    roc_dict = get_true_score_prediction(predictions, target)
    true = roc_dict['true']
    score = roc_dict['score']
    pred = roc_dict['prediction']

    metric_values.extend((area_roc, area_prc, acc, f1, prec, rec, true, score, pred))
    metrics = dict(zip(metric_keys, metric_values))

    return metrics


def upload_file(bucket_name, source_file_name, destination_blob_name):
    storage_client = storage.Client()
    bucket = storage_client.bucket(bucket_name)
    blob = bucket.blob(destination_blob_name)
    blob.upload_from_filename(source_file_name)


def write_metrics(bucket_name, metrics, destination, dir='/tmp'):
    temp_dir = tempfile.TemporaryDirectory(dir=dir)
    temp_metrics_file_path = str(path(temp_dir.name) / path(destination).name)
    with open(temp_metrics_file_path, 'w') as temp_file:
        json.dump(metrics, temp_file)
    upload_file(bucket_name, temp_metrics_file_path, destination)
    temp_dir.cleanup()


# Main -------------------------------------------------------------------------------------

def main(logger, args):
    """
    Main function
    Args:
        logger: logger
        args: args
    Returns:
        None
    """
    train_path = args.train_path
    model_path = args.model_path
    metrics_path = args.metrics_path
    bundle_path = args.bundle_path

    try:
        logger.info('initializing pipeline training.')
        logger.info('start spark session.')
        spark = (SparkSession.builder
                 .master("local[*]")
                 .appName("loan eligibility")
                 .getOrCreate())
        logger.info(f'spark version: {spark.sparkContext.version}')
        logger.info('start building pipeline.')
        preprocessing_stages = build_preprocessing_components()
        feature_engineering_stages = build_feature_engineering_components()
        model_training_stage = build_training_model_component()
        pipeline_cross_validator = build_hp_pipeline(preprocessing_stages, feature_engineering_stages,
                                                     model_training_stage)
        logger.info(f'load train data from {train_path}.')
        raw_data = (spark.read.format('csv')
                        .option("header", "true")
                        .schema(DATA_SCHEMA)
                        .load(train_path))
        logger.info(f'fit model pipeline.')
        train, test = raw_data.randomSplit(RANDOM_QUOTAS, seed=RANDOM_SEED)
        pipeline_model = pipeline_cross_validator.fit(train)
        predictions = pipeline_model.transform(test)
        metrics = get_metrics(predictions, TARGET, 'test')
        for m, v in metrics.items():
            print(f'{m}: {v}')

        logger.info(f'load model pipeline in {model_path}.')
        pipeline_model.write().overwrite().save(model_path)

        logger.info(f'upload metrics under {metrics_path}.')
        bucket = urlparse(model_path).netloc
        metrics_file_path = urlparse(metrics_path).path.strip('/')
        write_metrics(bucket, metrics, metrics_file_path)
        
        logger.info('export MLeap bundle to temporary location')
        pipeline_model.bestModel.serializeToBundle(f'jar:file:/tmp/bundle.zip', predictions)
        
        logger.info(f'upload MLeap bundle to {bundle_path}')
        bundle_file_path = urlparse(bundle_path).path.strip('/')
        bucket = urlparse(bundle_path).netloc
        logger.info(f'Copying /tmp/bundle.zip to bucket {bucket} using object name {bundle_file_path} ...')
        upload_file(bucket, '/tmp/bundle.zip', bundle_file_path)
        
    except RuntimeError as main_error:
        logger.error(main_error)
    else:
        logger.info('model pipeline training successfully completed!')
        return 0


if __name__ == "__main__":
    runtime_args = get_args()
    runtime_logger = set_logger()
    main(runtime_logger, runtime_args)

### Upload source code

In order to use the `DataprocPySparkBatchOp` from google-cloud-pipeline-components, you need to upload the code to the Cloud Storage bucket.

In [None]:
! gsutil cp $SRC/__init__.py $BUCKET_URI/src/__init__.py
! gsutil cp $SRC/data_preprocessing.py $BUCKET_URI/src/data_preprocessing.py
! gsutil cp $SRC/model_training.py $BUCKET_URI/src/model_training.py
! gsutil cp $SRC/hp_tuning.py $BUCKET_URI/src/hp_tuning.py

### Build a custom Dataproc Serverless container image

Dataproc Serverless provides default runtime images. Learn more about the [Dataproc Serverless Spark runtime releases](https://cloud.google.com/dataproc-serverless/docs/concepts/versions/spark-runtime-versions).

You can also use custom container images for your Dataproc Serverless workloads. The steps in this section builds a custom container image that includes additional dependencies. The custom container image can be specified when using the `DataprocPySparkBatchOp` component to launch the workload within a pipeline.

#### Define the Dataproc Serverless custom runtime image

In [None]:
!mkdir -m 777 -p $BUILD_PATH

In [None]:
%%writefile $BUILD_PATH/Dockerfile

# Debian 11 is recommended.
FROM debian:11-slim

# Suppress interactive prompts
ENV DEBIAN_FRONTEND=noninteractive

# (Required) Install utilities required by Spark scripts.
RUN apt update && apt install -y procps tini

# (Optional) Add extra jars.
ENV SPARK_EXTRA_JARS_DIR=/opt/spark/jars/
ENV SPARK_EXTRA_CLASSPATH='/opt/spark/jars/*'
RUN mkdir -p "${SPARK_EXTRA_JARS_DIR}"
COPY spark-bigquery-with-dependencies_2.12-0.22.2.jar "${SPARK_EXTRA_JARS_DIR}"

# (Optional) Install and configure Miniconda3.
ENV CONDA_HOME=/opt/miniconda3
ENV PYSPARK_PYTHON=${CONDA_HOME}/bin/python
ENV PATH=${CONDA_HOME}/bin:${PATH}
COPY Miniconda3-py39_4.10.3-Linux-x86_64.sh .
RUN bash Miniconda3-py39_4.10.3-Linux-x86_64.sh -b -p /opt/miniconda3 \
  && ${CONDA_HOME}/bin/conda config --system --set always_yes True \
  && ${CONDA_HOME}/bin/conda config --system --set auto_update_conda False \
  && ${CONDA_HOME}/bin/conda config --system --prepend channels conda-forge \
  && ${CONDA_HOME}/bin/conda config --system --set channel_priority strict

# (Optional) Install Conda packages.
#
# The following packages are installed in the default image, it is strongly
# recommended to include all of them.
#
# Use mamba solver to install packages quickly.
RUN ${CONDA_HOME}/bin/conda install -n base conda-libmamba-solver
RUN ${CONDA_HOME}/bin/conda install \
      cython \
      fastavro \
      fastparquet \
      gcsfs \
      google-cloud-bigquery-storage \
      google-cloud-bigquery[pandas] \
      google-cloud-dataproc \
      numpy \
      pandas \
      python \
      scikit-image \
      scikit-learn \
      scipy \
      mleap --solver=libmamba

# (Required) Create the 'spark' group/user.
# The GID and UID must be 1099. Home directory is required.
RUN groupadd -g 1099 spark
RUN useradd -u 1099 -g 1099 -d /home/spark -m spark
USER spark

#### Download the `spark-bigquery-with-dependencies` jar file

In [None]:
!gsutil cp gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.12-0.22.2.jar $BUILD_PATH
!wget -P $BUILD_PATH https://repo.anaconda.com/miniconda/Miniconda3-py39_4.10.3-Linux-x86_64.sh

#### Build the Dataproc Serverless custom runtime using Cloud Build

**Note:** this step may take approximately upto 20 minutes to complete.

In [None]:
!gcloud builds submit --tag $RUNTIME_CONTAINER_IMAGE $BUILD_PATH --machine-type=N1_HIGHCPU_32 --timeout=3600s --verbosity=info

### Build custom components for pipeline arguments

In order to pass job arguments, you create some custom components for each step of the pipeline.

#### Create component for passing args to preprocessing component

The following component passes the args `--train-data-path` and `--out-process-path` in the required format for the preprocessing function defined earlier.

In [None]:
@component(base_image="python:3.8-slim")
def build_preprocessing_args(train_data_path: str, processed_data_path: str) -> list:
    return [
        "--train-data-path",
        train_data_path,
        "--out-process-path",
        processed_data_path,
    ]

#### Create component for passing args to training component

The following component passes the args `--train-path`, `--model-path` and `--metrics-path` in the required format for the model training function defined earlier.

In [None]:
@component(base_image="python:3.8-slim")
def build_training_args(train_path: str, model_path: str, metrics_path: str) -> list:
    return [
        "--train-path",
        train_path,
        "--model-path",
        model_path,
        "--metrics-path",
        metrics_path,
    ]

#### Create model evaluation custom component

Define the component for processing the metrics for model evaluation. The `metrics_uri`, `metrics` and `plots` obtained as outputs from the model training component are further evaluated through this component.

In [None]:
@component(
    base_image="python:3.8",
    packages_to_install=["numpy==1.21.2", "pandas==1.3.3", "scikit-learn==0.24.2"],
)
def evaluate_model(
    metrics_uri: str,
    metrics: Output[Metrics],
    plots: Output[ClassificationMetrics],
) -> NamedTuple("Outputs", [("threshold_metric", float)]):
    # Libraries --------------------------------------------------------------------------------------------------------------------------
    import json

    import numpy as np
    from sklearn.metrics import confusion_matrix, roc_curve

    # Variables --------------------------------------------------------------------------------------------------------------------------
    metrics_path = metrics_uri.replace("gs://", "/gcs/")
    labels = ["not eligible", "eligible"]

    # Helpers --------------------------------------------------------------------------------------------------------------------------
    def calculate_roc(metrics, true, score):
        y_true_np = np.array(metrics[true])
        y_score_np = np.array(metrics[score])
        fpr, tpr, thresholds = roc_curve(
            y_true=y_true_np, y_score=y_score_np, pos_label=True
        )
        return fpr, tpr, thresholds

    def calculate_confusion_matrix(metrics, true, prediction):
        y_true_np = np.array(metrics[true])
        y_pred_np = np.array(metrics[prediction])
        c_matrix = confusion_matrix(y_true_np, y_pred_np)
        return c_matrix

    # Main -------------------------------------------------------------------------------------------------------------------------------
    with open(metrics_path) as json_file:
        metrics_dict = json.load(json_file)

    area_roc = metrics_dict["test_area_roc"]
    area_prc = metrics_dict["test_area_prc"]
    acc = metrics_dict["test_accuracy"]
    f1 = metrics_dict["test_f1"]
    prec = metrics_dict["test_precision"]
    rec = metrics_dict["test_recall"]

    metrics.log_metric("Test_areaUnderROC", area_roc)
    metrics.log_metric("Test_areaUnderPRC", area_prc)
    metrics.log_metric("Test_Accuracy", acc)
    metrics.log_metric("Test_f1-score", f1)
    metrics.log_metric("Test_Precision", prec)
    metrics.log_metric("Test_Recall", rec)

    fpr, tpr, thresholds = calculate_roc(metrics_dict, "true", "score")
    c_matrix = calculate_confusion_matrix(metrics_dict, "true", "prediction")
    plots.log_roc_curve(fpr.tolist(), tpr.tolist(), thresholds.tolist())
    plots.log_confusion_matrix(labels, c_matrix.tolist())

    component_outputs = NamedTuple(
        "Outputs",
        [
            ("threshold_metric", float),
        ],
    )

    return component_outputs(area_prc)

#### Create component for passing args to hyperparameter tuning component

The following component passes the args `--train-path`, `--model-path` and `--metrics-path`, and `--bundle-path` in the required format for the hyperparamter tuning function defined earlier.

In [None]:
@component(base_image="python:3.8-slim")
def build_hpt_args(
    train_path: str,
    model_path: str,
    metrics_path: str,
    bundle_path: str,
) -> list:
    return [
        "--train-path",
        train_path,
        "--model-path",
        model_path,
        "--metrics-path",
        metrics_path,
        "--bundle-path",
        bundle_path,
    ]

### (Optional) Serve your model using Vertex AI

The hyperparameter tuning task exports the best performing model as an MLeap bundle. The MLeap bundle can be imported into the Vertex AI Model Registry and used for prediction serving. See [Serving Spark ML model using Vertex AI](https://cloud.google.com/architecture/spark-ml-model-with-vertexai) for more information.

Enable import of the MLeap bundle into the Vertex AI Model Registry and online prediction serving.

In [None]:
# Set DEPLOY_MODEL to True
DEPLOY_MODEL = True

### Build the model serving container image

A *serving container image* is required to import your model into the Model Registry. The serving container image provides the model serving implementation for the model. Learn more about [serving Spark ML models using Vertex AI](https://cloud.google.com/architecture/spark-ml-model-with-vertexai).

**Note:** this step may take approximately 5 to 10 minutes to complete.

In [None]:
DEPLOY_MODEL_CONDITION = 'deploy'

if DEPLOY_MODEL:

    import os
    
    CWD = os.getcwd()

    # Clone and build the scala-sbt cloud builder
    ! git clone https://github.com/GoogleCloudPlatform/cloud-builders-community.git
    ! cd {CWD}/cloud-builders-community/scala-sbt && \
        gcloud builds submit .

    # Clone and build the serving container code
    ! cd {CWD} && git clone https://github.com/GoogleCloudPlatform/vertex-ai-spark-ml-serving.git
    ! cd {CWD}/vertex-ai-spark-ml-serving && \
        gcloud builds submit --config=cloudbuild.yaml \
            --substitutions="_LOCATION={REGION},_REPOSITORY={REPO_NAME},_IMAGE=spark-ml-serving" .

### Create component for importing a model artifact into a pipeline

The pipeline uses the `ModelImportOp` component to import (upload) a model to Vertex AI Model Registry.

The `import_model_artifact` python component creates a model artifact that can be passed to the `ModelImportOp` component.

In [None]:
@dsl.component(
    base_image="python:3.8-slim",
    packages_to_install=["google-cloud-aiplatform"],
)
def import_model_artifact(
    model: dsl.Output[dsl.Artifact], artifact_uri: str, serving_image_uri: str
):
    model.metadata["containerSpec"] = {
        "imageUri": serving_image_uri,
        "healthRoute": "/health",
        "predictRoute": "/predict",
    }
    model.uri = artifact_uri

### Define the schema for model serving

The serving container requires the model schema in JSON format, which is read during container startup. Learn more about [providing the model schema](https://cloud.google.com/architecture/spark-ml-model-with-vertexai#provide_the_model_schema).

Write the model schema file:

In [None]:
%%writefile $SRC/schema.json
{
  "input": [
    {
      "name": "loan_amount",
      "type": "DOUBLE"
    },
    {
      "name": "loan_term",
      "type": "STRING"
    },
    {
      "name": "property_area",
      "type": "STRING"
    },
    {
      "name": "feature_7",
      "type": "DOUBLE"
    },
    {
      "name": "feature_3",
      "type": "DOUBLE"
    },
    {
      "name": "feature_1",
      "type": "DOUBLE"
    },
    {
      "name": "feature_9",
      "type": "DOUBLE"
    },
    {
      "name": "feature_5",
      "type": "DOUBLE"
    },
    {
      "name": "feature_0",
      "type": "DOUBLE"
    },
    {
      "name": "feature_8",
      "type": "DOUBLE"
    },
    {
      "name": "feature_4",
      "type": "DOUBLE"
    },
    {
      "name": "feature_2",
      "type": "DOUBLE"
    },
    {
      "name": "feature_6",
      "type": "DOUBLE"
    }
  ],
  "output": [
    {
      "name": "prediction",
      "type": "DOUBLE"
    }
  ]
}

### Copy the model schema configuration file to GCS.

The serving container reads the model schema file location from the `AIP_STORAGE_URI` environment at startup. See [Import the model into Vertex AI](https://cloud.google.com/architecture/spark-ml-model-with-vertexai#import-the-model-into-vertex-ai) for more information.

In [None]:
! gsutil cp $SRC/schema.json $ARTIFACT_URI/schema.json

### Define your workflow as a Vertex AI Pipeline

Use the Kubeflow Pipelines SDK to define your workflow as a machine learning pipeline. The pipeline uses the custom components defined earlier, in addition to components from the `google-cloud-pipeline-components` package.

In [None]:
@dsl.pipeline(name=PIPELINE_NAME, description="A pipeline to train a PySpark model.")
def pipeline(
    preprocessing_main_python_file_uri: str = PREPROCESSING_PYTHON_FILE_URI,
    train_data_path: str = FEATURES_TRAIN_URI,
    preprocessed_data_path: str = PROCESSED_DATA_URI,
    training_main_python_file_uri: str = TRAINING_PYTHON_FILE_URI,
    train_path: str = PROCESSED_DATA_URI,
    model_path: str = MODEL_URI,
    metrics_path: str = METRICS_URI,
    threshold: float = AUPR_THRESHOLD,
    hpt_main_python_file_uri: str = HPT_PYTHON_FILE_URI,
    hpt_model_path: str = HPT_MODEL_URI,
    hpt_metrics_path: str = HPT_METRICS_URI,
    hpt_bundle_path: str = HPT_BUNDLE_URI,
    custom_container_image: str = RUNTIME_CONTAINER_IMAGE,
    model_name: str = MODEL_NAME,
    project_id: str = PROJECT_ID,
    location: str = REGION,
    subnetwork_uri: str = SUBNETWORK_URI,
    deploy_model: bool = DEPLOY_MODEL,
    artifact_uri: str = ARTIFACT_URI,
    serving_image_uri: str = SERVING_IMAGE_URI,
    dataproc_runtime_version: str = DATAPROC_RUNTIME_VERSION,
):
    from google_cloud_pipeline_components.v1.dataproc import \
        DataprocPySparkBatchOp
    from google_cloud_pipeline_components.v1.endpoint import (EndpointCreateOp,
                                                              ModelDeployOp)
    from google_cloud_pipeline_components.v1.model import ModelUploadOp

    # build preprocessed data args
    build_preprocessing_args_op = build_preprocessing_args(
        train_data_path=train_data_path, processed_data_path=preprocessed_data_path
    )

    # preprocess data
    data_preprocessing_op = DataprocPySparkBatchOp(
        project=project_id,
        location=location,
        container_image=custom_container_image,
        main_python_file_uri=preprocessing_main_python_file_uri,
        args=build_preprocessing_args_op.output,
        subnetwork_uri=subnetwork_uri,
        runtime_config_version=dataproc_runtime_version,
    ).after(build_preprocessing_args_op)

    # build training data args
    build_training_args_op = build_training_args(
        train_path=train_path,
        model_path=model_path,
        metrics_path=metrics_path,
    ).after(data_preprocessing_op)

    # training model
    model_training_op = DataprocPySparkBatchOp(
        project=project_id,
        location=location,
        container_image=custom_container_image,
        main_python_file_uri=training_main_python_file_uri,
        args=build_training_args_op.output,
        subnetwork_uri=subnetwork_uri,
        runtime_config_version=dataproc_runtime_version,
    ).after(build_training_args_op)

    evaluate_model_op = evaluate_model(metrics_uri=metrics_path).after(
        model_training_op
    )

    # evaluate condition
    with Condition(
        evaluate_model_op.outputs["threshold_metric"] >= threshold,
        name=AUPR_HYPERTUNE_CONDITION,
    ):
        build_hpt_args_op = build_hpt_args(
            train_path=train_path,
            model_path=hpt_model_path,
            metrics_path=hpt_metrics_path,
            bundle_path=hpt_bundle_path,
        ).after(evaluate_model_op)

        # hyperparameter tuning
        hyperparameter_tuning_op = DataprocPySparkBatchOp(
            project=project_id,
            location=location,
            container_image=custom_container_image,
            main_python_file_uri=hpt_main_python_file_uri,
            args=build_hpt_args_op.output,
            runtime_config_properties=HPT_RUNTIME_PROPERTIES,
            subnetwork_uri=subnetwork_uri,
            # TODO: change to Dataproc Serverless Runtime 1.1.x image when MLeap supports Spark 3.3
            runtime_config_version="1.0.29",
        ).after(model_training_op)

        # evaluate condition to upload and deploy model to Vertex AI
        with Condition(
            # kfp casts `bool` parameter to `str`
            deploy_model == "True",
            name=DEPLOY_MODEL_CONDITION,
        ):
            # import the model into the pipeline as a kfp model artifact
            import_model_artifact_op = import_model_artifact(
                artifact_uri=artifact_uri,
                serving_image_uri=serving_image_uri,
            )

            # upload model to Vertex AI
            model_upload_op = ModelUploadOp(
                project=project_id,
                location=location,
                display_name=model_name,
                unmanaged_container_model=import_model_artifact_op.outputs["model"],
            ).after(hyperparameter_tuning_op)

            # create a serving endpoint
            endpoint_op = EndpointCreateOp(
                project=project_id,
                location=location,
                display_name=model_name,
            ).after(model_upload_op)

            # deploy  model to the serving endpoint
            _ = ModelDeployOp(
                model=model_upload_op.outputs["model"],
                endpoint=endpoint_op.outputs["endpoint"],
                dedicated_resources_machine_type="n1-standard-2",
                dedicated_resources_min_replica_count=1,
                dedicated_resources_max_replica_count=1,
            ).after(endpoint_op)

### Compile your pipeline into a JSON file

Now that you define the workflow of your pipeline, you compile the pipeline into a JSON format.

In [None]:
compiler.Compiler().compile(pipeline_func=pipeline, package_path=PIPELINE_PACKAGE_PATH)

### Submit your pipeline run

Next, you use the Vertex AI Python SDK to submit and run your pipeline through Vertex AI Pipelines.

The parameters, artifacts, and metrics produced from the pipeline run are automatically captured into Vertex AI Experiments as an experiment run.

In [None]:
pipeline = vertex_ai.PipelineJob(
    display_name=PIPELINE_NAME,
    template_path=PIPELINE_PACKAGE_PATH,
    pipeline_root=PIPELINE_ROOT,
    enable_caching=False,
)

pipeline.submit(service_account=SERVICE_ACCOUNT, experiment=EXPERIMENT_NAME)

### Check the status of your pipeline run

Finally, you can check the status of your pipeline through the link provided in the output of the earlier cell. Alternately, you can use `wait()` method from the below cell to wait till the pipeline executes completely and check the status of the pipeline execution.

In [None]:
pipeline.wait()

### (Optional) View experiment runs

You can retrieve the parameters, artifacts, and metrics for all experiment runs as a pandas DataFrame. See [Compare and analyze runs](https://cloud.google.com/vertex-ai/docs/experiments/compare-analyze-runs) for more information on the topic.

In [None]:
# get the experiment by name
experiment = vertex_ai.Experiment(experiment_name=EXPERIMENT_NAME)

# export the data as a dataframe
experiment_df = experiment.get_data_frame()

# Show successfully completed experiment runs, sorted by F1 score
experiment_df.query('state == "COMPLETE"').sort_values(
    "metric.Test_f1-score", ascending=False
)

### (Optional) Get online predictions from the deployed model

You can request online predictions if the model was deployed to a Vertex AI endpoint. Use the `google-cloud-aiplatform` client library to request predictions, or you can use `curl`.

For this model, the prediction response contains the predicted label (`0 == not eligible`, `1 == eligible`) for each prediction instance that is sent to the endpoint.

#### Use `google-cloud-aiplatform` to request online predictions

The following cell demonstrates how to use the `google-cloud-aiplatform` client library to request predictions from one or more instances.

In [None]:
instances = [
    [214.0, "360", "Rural", 2.13, 2.21, 0.0, 0.0, 2.31, 2.01, 0.0, 0.0, 0.0, 0.0],
    [213.0, "360", "Semiurban", 2.03, 2.11, 0.0, 0.0, 2.13, 2.02, 0.0, 0.0, 0.0, 0.0],
]

endpoint = vertex_ai.Endpoint.list(filter=f'display_name="{MODEL_NAME}"')[-1]
endpoint.predict(instances)

#### Use `curl` to request online predictions

To use `curl`, first write the prediction instances to a file:

In [None]:
%%writefile instances.json
{
    "instances": [
        [214.0, "360", "Rural", 2.13, 2.21, 0.0, 0.0, 2.31, 2.01, 0.0, 0.0, 0.0, 0.0],
        [213.0, "360", "Semiurban", 2.03, 2.11, 0.0, 0.0, 2.13, 2.02, 0.0, 0.0, 0.0, 0.0]
    ]
}

Use `curl` to send the prediction request to the Vertex AI endpoint:

In [None]:
! curl -X POST \
   -H "Authorization: Bearer $(gcloud auth print-access-token)" \
   -H "Content-Type: application/json" \
   https://{REGION}-aiplatform.googleapis.com/v1/projects/{PROJECT_ID}/locations/{REGION}/endpoints/{endpoint.name}:predict \
   -d "@instances.json"

## Cleaning up

To clean up all Google Cloud resources used in this project, you can [delete the Google Cloud
project](https://cloud.google.com/resource-manager/docs/creating-managing-projects#shutting_down_projects) you used for the tutorial.

Otherwise, you can delete the individual resources you created in this tutorial:

- Vertex AI Pipeline
- Vertex AI Endpoint
- Vertex AI Model
- Vertex AI Experiment
- Artifact Repository
- Cloud Storage bucket
- Local src, build and cloned repo folders

In [None]:
# Delete pipeline
pipeline.delete()

# Delete endpoints
endpoint_list = vertex_ai.Endpoint.list(filter=f'display_name="{MODEL_NAME}"')
for endpoint in endpoint_list:
    endpoint.undeploy_all()
    endpoint.delete()

# Delete model
model_list = vertex_ai.Model.list(filter=f'display_name="{MODEL_NAME}"')
for model in model_list:
    model.delete()

# Delete experiment
experiment.delete()

In [None]:
# Delete the Artifact repository
! gcloud artifacts repositories delete $REPO_NAME --location=$REGION --quiet

Set `delete_bucket` to **True** to delete the Cloud Storage bucket used in this notebook.

In [None]:
# Delete the Cloud Storage bucket
delete_bucket = False
if delete_bucket or os.getenv("IS_TESTING"):
    ! gsutil -m rm -r $BUCKET_URI

In [None]:
# remove the local src, build and repo folders
!rm -rf $SRC $BUILD_PATH cloud-builders-community vertex-ai-spark-ml-serving