In [None]:
# Copyright 2021 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

# Vertex AI Pipelines: AutoML Tabular pipelines using google-cloud-pipeline-components

<table align="left">
  <td style="text-align: center">
    <a href="https://colab.research.google.com/github/GoogleCloudPlatform/vertex-ai-samples/blob/main/notebooks/official/pipelines/automl_tabular_classification_beans.ipynb">
      <img src="https://cloud.google.com/ml-engine/images/colab-logo-32px.png" alt="Google Colaboratory logo"><br> Open in Colab
    </a>
  </td>
  <td style="text-align: center">
    <a href="https://console.cloud.google.com/vertex-ai/colab/import/https:%2F%2Fraw.githubusercontent.com%2FGoogleCloudPlatform%2Fvertex-ai-samples%2Fmain%2Fnotebooks%2Fofficial%2Fpipelines%2Fautoml_tabular_classification_beans.ipynb">
      <img width="32px" src="https://cloud.google.com/ml-engine/images/colab-enterprise-logo-32px.png" alt="Google Cloud Colab Enterprise logo"><br> Open in Colab Enterprise
    </a>
  </td>    
  <td style="text-align: center">
    <a href="https://console.cloud.google.com/vertex-ai/workbench/deploy-notebook?download_url=https://raw.githubusercontent.com/GoogleCloudPlatform/vertex-ai-samples/main/notebooks/official/pipelines/automl_tabular_classification_beans.ipynb">
      <img src="https://lh3.googleusercontent.com/UiNooY4LUgW_oTvpsNhPpQzsstV5W8F7rYgxgGBD85cWJoLmrOzhVs_ksK_vgx40SHs7jCqkTkCk=e14-rj-sc0xffffff-h130-w32" alt="Vertex AI logo"><br> Open in Workbench
    </a>
  </td>
  <td style="text-align: center">
    <a href="https://github.com/GoogleCloudPlatform/vertex-ai-samples/blob/main/notebooks/official/pipelines/automl_tabular_classification_beans.ipynb">
      <img src="https://cloud.google.com/ml-engine/images/github-logo-32px.png" alt="GitHub logo"><br> View on GitHub
    </a>
  </td>
</table>

## Overview

This notebook shows how to use the components from [*google_cloud_pipeline_components*](https://github.com/kubeflow/pipelines/tree/master/components/google-cloud) SDK to build an AutoML tabular classification workflow in [Vertex AI Pipelines](https://cloud.google.com/vertex-ai/docs/pipelines).

You build a pipeline in this notebook that looks as below:

<a href="https://storage.googleapis.com/amy-jo/images/mp/beans.png" target="_blank"><img src="https://storage.googleapis.com/amy-jo/images/mp/beans.png" width="95%"/></a>

Learn more about [Vertex AI Pipelines](https://cloud.google.com/vertex-ai/docs/pipelines/introduction) and [AutoML components](https://cloud.google.com/vertex-ai/docs/pipelines/vertex-automl-component). Learn more about [Classification for tabular data](https://cloud.google.com/vertex-ai/docs/tabular-data/classification-regression/overview).

### Objective

In this tutorial, you learn to use Vertex AI Pipelines and Google Cloud Pipeline Components to build an AutoML tabular classification model.


This tutorial uses the following Vertex AI services:

- Vertex AI Pipelines
- Google Cloud Pipeline Components
- Vertex AutoML
- Vertex AI Model
- Vertex AI Endpoint

The steps performed include:

- Create a KFP pipeline that creates a Vertex AI Dataset.
- Add a component to the pipeline that trains an AutoML tabular classification model resource.
- Add a component that creates a Vertex AI endpoint resource.
- Add a component that deploys the model resource to the endpoint resource.
- Compile the KFP pipeline.
- Execute the KFP pipeline using Vertex AI Pipelines.

Learn more about the [Vertex AI components in Google Cloud Pipeline Components SDK](https://google-cloud-pipeline-components.readthedocs.io/en/latest/google_cloud_pipeline_components.aiplatform.html#module-google_cloud_pipeline_components.aiplatform).

### Dataset

This tutorial uses the UCI Machine Learning ['Dry beans dataset'](https://archive.ics.uci.edu/ml/datasets/Dry+Bean+Dataset), from: KOKLU, M. and OZKAN, I.A., (2020), "Multiclass Classification of Dry Beans Using Computer Vision and Machine Learning Techniques". In Computers and Electronics in Agriculture, 174, 105507. [DOI](https://doi.org/10.1016/j.compag.2020.105507).

### Costs

This tutorial uses billable components of Google Cloud:

* Vertex AI
* Cloud Storage

Learn about [Vertex AI
pricing](https://cloud.google.com/vertex-ai/pricing) and [Cloud Storage
pricing](https://cloud.google.com/storage/pricing), and use the [Pricing
Calculator](https://cloud.google.com/products/calculator/)
to generate a cost estimate based on your projected usage.

## Get started

### Install Vertex AI SDK for Python and other required packages


In [None]:
! pip3 install --upgrade --quiet google-cloud-aiplatform \
                                 google-cloud-storage \
                                 kfp \
                                 google-cloud-pipeline-components

### Restart runtime (Colab only)

To use the newly installed packages, you must restart the runtime on Google Colab.

In [None]:
import sys

if "google.colab" in sys.modules:

    import IPython

    app = IPython.Application.instance()
    app.kernel.do_shutdown(True)

<div class="alert alert-block alert-warning">
<b>⚠️ The kernel is going to restart. Wait until it's finished before continuing to the next step. ⚠️</b>
</div>


#### Check the package versions

Check the versions of the packages you installed.  The KFP SDK version should be >=1.8.

In [None]:
! python3 -c "import kfp; print('KFP SDK version: {}'.format(kfp.__version__))"
! python3 -c "import google_cloud_pipeline_components; print('google_cloud_pipeline_components version: {}'.format(google_cloud_pipeline_components.__version__))"

### Authenticate your notebook environment (Colab only)

Authenticate your environment on Google Colab.


In [None]:
import sys

if "google.colab" in sys.modules:

    from google.colab import auth

    auth.authenticate_user()

### Set Google Cloud project information 

To get started using Vertex AI, you must have an existing Google Cloud project. Learn more about [setting up a project and a development environment](https://cloud.google.com/vertex-ai/docs/start/cloud-environment).

In [None]:
PROJECT_ID = "[your-project-id]"  # @param {type:"string"}
LOCATION = "us-central1"  # @param {type:"string"}

### Create a Cloud Storage bucket

Create a storage bucket to store intermediate artifacts such as datasets.

In [None]:
BUCKET_URI = f"gs://your-bucket-name-{PROJECT_ID}-unique"  # @param {type:"string"}

**If your bucket doesn't already exist**: Run the following cell to create your Cloud Storage bucket.

In [None]:
! gsutil mb -l {LOCATION} -p {PROJECT_ID} {BUCKET_URI}

### Service Account

**If you don't know your service account**, try to get your service account using `gcloud` command by executing the second cell below.

In [None]:
SERVICE_ACCOUNT = ""  # @param {type:"string"}

In [None]:
import sys

IS_COLAB = "google.colab" in sys.modules
if (
    SERVICE_ACCOUNT == ""
    or SERVICE_ACCOUNT is None
    or SERVICE_ACCOUNT == "[your-service-account]"
):
    # Get your service account from gcloud
    if not IS_COLAB:
        shell_output = !gcloud auth list 2>/dev/null
        SERVICE_ACCOUNT = shell_output[2].replace("*", "").strip()

    if IS_COLAB:
        shell_output = ! gcloud projects describe  $PROJECT_ID
        project_number = shell_output[-1].split(":")[1].strip().replace("'", "")
        SERVICE_ACCOUNT = f"{project_number}-compute@developer.gserviceaccount.com"

    print("Service Account:", SERVICE_ACCOUNT)

#### Set service account access for Vertex AI Pipelines

Run the following commands to grant your service account access to read and write pipeline artifacts in the bucket that you created in the previous step. You only need to run these once per service account.

In [None]:
! gsutil iam ch serviceAccount:{SERVICE_ACCOUNT}:roles/storage.objectCreator $BUCKET_URI

! gsutil iam ch serviceAccount:{SERVICE_ACCOUNT}:roles/storage.objectViewer $BUCKET_URI

### Import libraries and define constants

In [None]:
from typing import NamedTuple

import google.cloud.aiplatform as aiplatform
import kfp
from google.cloud import bigquery
from kfp import compiler, dsl
from kfp.dsl import (Artifact, ClassificationMetrics, Input, Metrics, Output,
                     component)

#### Vertex AI constants

Setup up the following constants for Vertex AI pipeline:
- `PIPELINE_NAME`: Set name for the pipeline.
- `PIPELINE_ROOT`: Cloud Storage bucket path to store pipeline artifacts.

In [None]:
# set path for storing the pipeline artifacts
PIPELINE_NAME = "automl-tabular-beans-training"
PIPELINE_ROOT = "{}/pipeline_root/beans".format(BUCKET_URI)

### Initialize Vertex AI SDK for Python

To get started using Vertex AI, you must [enable the Vertex AI API](https://console.cloud.google.com/flows/enableapi?apiid=aiplatform.googleapis.com).

Initialize the Vertex AI SDK for Python for your project and corresponding bucket.

In [None]:
aiplatform.init(project=PROJECT_ID, location=LOCATION, staging_bucket=BUCKET_URI)

## Define a custom component for metrics evaluation 

In this tutorial, you define one custom pipeline component. The remaining components are pre-built
components for Vertex AI services.

The custom pipeline component you define is a Python function-based component.
Python function-based components make it easier to iterate quickly by letting you build your component code as a Python function and generating the component specification for you.

Note the `@component` decorator.  When you evaluate the `classification_model_eval` function, the component is compiled to what is essentially a task factory function, that can be used in the the pipeline definition.

In addition, a **tabular_eval_component.yaml** component definition file is generated.  The component **yaml** file can be shared & placed under version control, and used later to define a pipeline step.

The component definition specifies a base image for the component to use, and specifies that the `google-cloud-aiplatform` package should be installed. When not specified, the base image defaults to Python 3.7.

The custom pipeline component you create in this step retrieves the classification model evaluation metrics generated by the AutoML tabular training process. Then, it parses the evaluation data, and renders the ROC curve and confusion matrix for the model. Further, the set thresholds are checked against the generated metrics to determine whether the model is accurate enough for deployment.

**Note**: This custom component is specific to an AutoML tabular classification task.

In [None]:
@component(
    base_image="gcr.io/deeplearning-platform-release/tf2-cpu.2-6:latest",
    packages_to_install=["google-cloud-aiplatform"],
)
def classification_model_eval_metrics(
    project: str,
    location: str,
    thresholds_dict_str: str,
    model: Input[Artifact],
    metrics: Output[Metrics],
    metricsc: Output[ClassificationMetrics],
) -> NamedTuple("Outputs", [("dep_decision", str)]):  # Return parameter.

    import json
    import logging

    from google.cloud import aiplatform

    aiplatform.init(project=project)

    # Fetch model eval info
    def get_eval_info(model):
        response = model.list_model_evaluations()
        metrics_list = []
        metrics_string_list = []
        for evaluation in response:
            evaluation = evaluation.to_dict()
            print("model_evaluation")
            print(" name:", evaluation["name"])
            print(" metrics_schema_uri:", evaluation["metricsSchemaUri"])
            metrics = evaluation["metrics"]
            for metric in metrics.keys():
                logging.info("metric: %s, value: %s", metric, metrics[metric])
            metrics_str = json.dumps(metrics)
            metrics_list.append(metrics)
            metrics_string_list.append(metrics_str)

        return (
            evaluation["name"],
            metrics_list,
            metrics_string_list,
        )

    # Use the given metrics threshold(s) to determine whether the model is
    # accurate enough to deploy.
    def classification_thresholds_check(metrics_dict, thresholds_dict):
        for k, v in thresholds_dict.items():
            logging.info("k {}, v {}".format(k, v))
            if k in ["auRoc", "auPrc"]:  # higher is better
                if metrics_dict[k] < v:  # if under threshold, don't deploy
                    logging.info("{} < {}; returning False".format(metrics_dict[k], v))
                    return False
        logging.info("threshold checks passed.")
        return True

    def log_metrics(metrics_list, metricsc):
        test_confusion_matrix = metrics_list[0]["confusionMatrix"]
        logging.info("rows: %s", test_confusion_matrix["rows"])

        # log the ROC curve
        fpr = []
        tpr = []
        thresholds = []
        for item in metrics_list[0]["confidenceMetrics"]:
            fpr.append(item.get("falsePositiveRate", 0.0))
            tpr.append(item.get("recall", 0.0))
            thresholds.append(item.get("confidenceThreshold", 0.0))
        print(f"fpr: {fpr}")
        print(f"tpr: {tpr}")
        print(f"thresholds: {thresholds}")
        metricsc.log_roc_curve(fpr, tpr, thresholds)

        # log the confusion matrix
        annotations = []
        for item in test_confusion_matrix["annotationSpecs"]:
            annotations.append(item["displayName"])
        logging.info("confusion matrix annotations: %s", annotations)
        metricsc.log_confusion_matrix(
            annotations,
            test_confusion_matrix["rows"],
        )

        # log textual metrics info as well
        for metric in metrics_list[0].keys():
            if metric != "confidenceMetrics":
                val_string = json.dumps(metrics_list[0][metric])
                metrics.log_metric(metric, val_string)

    logging.getLogger().setLevel(logging.INFO)

    # extract the model resource name from the input Model Artifact
    model_resource_path = model.metadata["resourceName"]
    logging.info("model path: %s", model_resource_path)

    # Get the trained model resource
    model = aiplatform.Model(model_resource_path)

    # Get model evaluation metrics from the the trained model
    eval_name, metrics_list, metrics_str_list = get_eval_info(model)
    logging.info("got evaluation name: %s", eval_name)
    logging.info("got metrics list: %s", metrics_list)
    log_metrics(metrics_list, metricsc)

    thresholds_dict = json.loads(thresholds_dict_str)
    deploy = classification_thresholds_check(metrics_list[0], thresholds_dict)
    if deploy:
        dep_decision = "true"
    else:
        dep_decision = "false"
    logging.info("deployment decision is %s", dep_decision)

    return (dep_decision,)


compiler.Compiler().compile(
    classification_model_eval_metrics, "tabular_eval_component.yaml"
)

## Define pipeline 

Define the pipeline for AutoML tabular classification using the components from `google_cloud_pipeline_components` package.

In [None]:
@kfp.dsl.pipeline(name=PIPELINE_NAME, pipeline_root=PIPELINE_ROOT)
def pipeline(
    bq_source: str,
    DATASET_DISPLAY_NAME: str,
    TRAINING_DISPLAY_NAME: str,
    MODEL_DISPLAY_NAME: str,
    ENDPOINT_DISPLAY_NAME: str,
    MACHINE_TYPE: str,
    project: str,
    gcp_region: str,
    thresholds_dict_str: str,
):
    from google_cloud_pipeline_components.v1.automl.training_job import \
        AutoMLTabularTrainingJobRunOp
    from google_cloud_pipeline_components.v1.dataset.create_tabular_dataset.component import \
        tabular_dataset_create as TabularDatasetCreateOp
    from google_cloud_pipeline_components.v1.endpoint.create_endpoint.component import \
        endpoint_create as EndpointCreateOp
    from google_cloud_pipeline_components.v1.endpoint.deploy_model.component import \
        model_deploy as ModelDeployOp

    dataset_create_op = TabularDatasetCreateOp(
        project=project,
        location=gcp_region,
        display_name=DATASET_DISPLAY_NAME,
        bq_source=bq_source,
    )

    training_op = AutoMLTabularTrainingJobRunOp(
        project=project,
        location=gcp_region,
        display_name=TRAINING_DISPLAY_NAME,
        optimization_prediction_type="classification",
        optimization_objective="minimize-log-loss",
        budget_milli_node_hours=1000,
        model_display_name=MODEL_DISPLAY_NAME,
        column_specs={
            "Area": "numeric",
            "Perimeter": "numeric",
            "MajorAxisLength": "numeric",
            "MinorAxisLength": "numeric",
            "AspectRation": "numeric",
            "Eccentricity": "numeric",
            "ConvexArea": "numeric",
            "EquivDiameter": "numeric",
            "Extent": "numeric",
            "Solidity": "numeric",
            "roundness": "numeric",
            "Compactness": "numeric",
            "ShapeFactor1": "numeric",
            "ShapeFactor2": "numeric",
            "ShapeFactor3": "numeric",
            "ShapeFactor4": "numeric",
            "Class": "categorical",
        },
        dataset=dataset_create_op.outputs["dataset"],
        target_column="Class",
    )

    model_eval_task = classification_model_eval_metrics(
        project=project,
        location=gcp_region,
        thresholds_dict_str=thresholds_dict_str,
        model=training_op.outputs["model"],
    )

    with dsl.If(
        model_eval_task.outputs["dep_decision"] == "true",
        name="deploy_decision",
    ):

        endpoint_op = EndpointCreateOp(
            project=project,
            location=gcp_region,
            display_name=ENDPOINT_DISPLAY_NAME,
        )

        ModelDeployOp(
            model=training_op.outputs["model"],
            endpoint=endpoint_op.outputs["endpoint"],
            dedicated_resources_min_replica_count=1,
            dedicated_resources_max_replica_count=1,
            dedicated_resources_machine_type=MACHINE_TYPE,
        )

## Compile the pipeline

Next, compile the pipeline to a yaml file.

In [None]:
compiler.Compiler().compile(
    pipeline_func=pipeline,
    package_path="tabular_classification_pipeline.yaml",
)

## Run the pipeline

Pass the input parameters required for the pipeline and run it. The defined pipeline takes the following parameters:

- `bq_source`: BigQuery source for the tabular dataset.
- `DATASET_DISPLAY_NAME`: Display name for the Vertex AI managed dataset.
- `TRAINIG_DISPLAY_NAME`: Display name for the AutoML training job.
- `MODEL_DISPLAY_NAME`: Display name for the Vertex AI model generated as a result of the training job.
- `ENDPOINT_DISPLAY_NAME`: Display name for the Vertex AI endpoint where the model is deployed.
- `MACHINE_TYPE`: Machine type for the serving container.
- `project`: Id of the project where the pipeline is run.
- `gcp_region`: Region for setting the pipeline location.
- `thresholds_dict_str`: Dictionary of thresholds based on which the model deployment is conditioned.
- `pipeline_root`:  To override the pipeline root path specified in the pipeline job's definition, specify a path that your pipeline job can access, such as a Cloud Storage bucket URI.

In [None]:
# Set the display-names for Vertex AI resources
PIPELINE_DISPLAY_NAME = "[your-pipeline-display-name]"  # @param {type:"string"}
DATASET_DISPLAY_NAME = "[your-dataset-display-name]"  # @param {type:"string"}
MODEL_DISPLAY_NAME = "[your-model-display-name]"  # @param {type:"string"}
TRAINING_DISPLAY_NAME = "[your-training-job-display-name]"  # @param {type:"string"}
ENDPOINT_DISPLAY_NAME = "[your-endpoint-display-name]"  # @param {type:"string"}

# Otherwise, use the default display-names
if PIPELINE_DISPLAY_NAME == "[your-pipeline-display-name]":
    PIPELINE_DISPLAY_NAME = "pipeline_beans-unique"

if DATASET_DISPLAY_NAME == "[your-dataset-display-name]":
    DATASET_DISPLAY_NAME = "dataset_beans-unique"

if MODEL_DISPLAY_NAME == "[your-model-display-name]":
    MODEL_DISPLAY_NAME = "model_beans-unique"

if TRAINING_DISPLAY_NAME == "[your-training-job-display-name]":
    TRAINING_DISPLAY_NAME = "automl_training_beans-unique"

if ENDPOINT_DISPLAY_NAME == "[your-endpoint-display-name]":
    ENDPOINT_DISPLAY_NAME = "endpoint_beans-unique"

# Set machine type
MACHINE_TYPE = "n1-standard-4"

In [None]:
# Validate region of the given source (BigQuery) against region of the pipeline
bq_source = "aju-dev-demos.beans.beans1"

client = bigquery.Client()
bq_region = client.get_table(bq_source).location.lower()
try:
    assert bq_region in LOCATION
    print(f"Region validated: {LOCATION}")
except AssertionError:
    print(
        "Please make sure the region of BigQuery (source) and that of the pipeline are the same."
    )

# Configure the pipeline
job = aiplatform.PipelineJob(
    display_name=PIPELINE_DISPLAY_NAME,
    template_path="tabular_classification_pipeline.yaml",
    pipeline_root=PIPELINE_ROOT,
    parameter_values={
        "project": PROJECT_ID,
        "gcp_region": LOCATION,
        "bq_source": f"bq://{bq_source}",
        "thresholds_dict_str": '{"auRoc": 0.95}',
        "DATASET_DISPLAY_NAME": DATASET_DISPLAY_NAME,
        "TRAINING_DISPLAY_NAME": TRAINING_DISPLAY_NAME,
        "MODEL_DISPLAY_NAME": MODEL_DISPLAY_NAME,
        "ENDPOINT_DISPLAY_NAME": ENDPOINT_DISPLAY_NAME,
        "MACHINE_TYPE": MACHINE_TYPE,
    },
    enable_caching=True,
)

Run the pipeline job. Click on the generated link to see your run in the Cloud Console.

In [None]:
# Run the job
job.run()

## Check the parameters and metrics of the pipeline run from their tracked metadata

Next, you use the Vertex AI SDK for Python to check the parameters and metrics of the pipeline run. Wait until the pipeline run has finished to run the next cell.

In [None]:
pipeline_df = aiplatform.get_pipeline_df(pipeline=PIPELINE_NAME)
print(pipeline_df.head(2))

## Cleaning up

To clean up all Google Cloud resources used in this project, you can [delete the Google Cloud
project](https://cloud.google.com/resource-manager/docs/creating-managing-projects#shutting_down_projects) you used for the tutorial.

Otherwise, you can delete the individual resources you created in this tutorial.

In [None]:
# Delete the Vertex AI Pipeline Job
job.delete()

# List and filter the Vertex AI Endpoint
endpoints = aiplatform.Endpoint.list(
    filter=f"display_name={ENDPOINT_DISPLAY_NAME}", order_by="create_time"
)
# Delete the Vertex AI Endpoint
if len(endpoints) > 0:
    endpoint = endpoints[0]
    endpoint.delete(force=True)

# List and filter the Vertex AI model
models = aiplatform.Model.list(
    filter=f"display_name={MODEL_DISPLAY_NAME}", order_by="create_time"
)
# Delete the Vertex AI model
if len(models) > 0:
    model = models[0]
    model.delete()

# List and filter the Vertex AI Dataset
datasets = aiplatform.TabularDataset.list(
    filter=f"display_name={DATASET_DISPLAY_NAME}", order_by="create_time"
)
# Delete the Vertex AI Dataset
if len(datasets) > 0:
    dataset = datasets[0]
    dataset.delete()

# Delete the Cloud Storage bucket
delete_bucket = False  # Set True for deletion
if delete_bucket:
    ! gsutil rm -r $BUCKET_URI