In [None]:
# Copyright 2022 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

# AutoML Tabular Workflow pipelines

<table align="left">
  <td style="text-align: center">
    <a href="https://colab.research.google.com/github/GoogleCloudPlatform/vertex-ai-samples/blob/main/notebooks/official/automl/automl_tabular_on_vertex_pipelines.ipynb">
      <img src="https://cloud.google.com/ml-engine/images/colab-logo-32px.png" alt="Google Colaboratory logo"><br> Open in Colab
    </a>
  </td>
  <td style="text-align: center">
    <a href="https://console.cloud.google.com/vertex-ai/colab/import/https:%2F%2Fraw.githubusercontent.com%2FGoogleCloudPlatform%2Fvertex-ai-samples%2Fmain%2Fnotebooks%2Fofficial%2Fautoml%2Fautoml_tabular_on_vertex_pipelines.ipynb">
      <img width="32px" src="https://cloud.google.com/ml-engine/images/colab-enterprise-logo-32px.png" alt="Google Cloud Colab Enterprise logo"><br> Open in Colab Enterprise
    </a>
  </td>    
  <td style="text-align: center">
    <a href="https://console.cloud.google.com/vertex-ai/workbench/deploy-notebook?download_url=https://raw.githubusercontent.com/GoogleCloudPlatform/vertex-ai-samples/main/notebooks/official/automl/automl_tabular_on_vertex_pipelines.ipynb">
      <img src="https://lh3.googleusercontent.com/UiNooY4LUgW_oTvpsNhPpQzsstV5W8F7rYgxgGBD85cWJoLmrOzhVs_ksK_vgx40SHs7jCqkTkCk=e14-rj-sc0xffffff-h130-w32" alt="Vertex AI logo"><br> Open in Workbench
    </a>
  </td>
  <td style="text-align: center">
    <a href="https://github.com/GoogleCloudPlatform/vertex-ai-samples/blob/main/notebooks/official/automl/automl_tabular_on_vertex_pipelines.ipynb">
      <img src="https://cloud.google.com/ml-engine/images/github-logo-32px.png" alt="GitHub logo"><br> View on GitHub
    </a>
  </td>
</table>

## Overview

In this tutorial, you will use two Vertex AI Tabular Workflows pipelines to train AutoML models using different configurations. You will see how `get_automl_tabular_pipeline_and_parameters` gives you the ability to customize the default AutoML Tabular pipeline, and how `get_skip_architecture_search_pipeline_and_parameters` allows you to reduce the training time and cost for an AutoML model by using the tuning results from a previous pipeline run.

Learn more about [Tabular Workflow for E2E AutoML](https://cloud.google.com/vertex-ai/docs/tabular-data/tabular-workflows/e2e-automl).

### Objective

In this tutorial, you learn how to create two regression models using [Vertex AI Pipelines](https://cloud.google.com/vertex-ai/docs/pipelines/introduction) downloaded from [Google Cloud Pipeline Components](https://cloud.google.com/vertex-ai/docs/pipelines/components-introduction) (GCPC). These pipelines will be Vertex AI Tabular Workflow pipelines which are maintained by Google. These pipelines will showcase different ways to customize the Vertex Tabular training process.

This tutorial uses the following Google Cloud ML services:

- `AutoML Training`
- `Vertex AI Datasets`

The steps performed are:

- Create a training pipeline that reduces the search space from the default to save time.
- Create a training pipeline that reuses the architecture search results from the previous pipeline to save time.

### Dataset

The dataset you will be using is [Bank Marketing](https://archive.ics.uci.edu/ml/datasets/bank+marketing).
The data is for direct marketing campaigns (phone calls) of a Portuguese banking institution. The binary classification goal is to predict if a client will subscribe a term deposit. For this notebook, we randomly selected 90% of the rows in the original dataset and saved them in a train.csv file hosted on Cloud Storage. To download the file, click [here](https://storage.googleapis.com/cloud-samples-data/vertex-ai/tabular-workflows/datasets/bank-marketing/train.csv).

### Costs

This tutorial uses billable components of Google Cloud:

* Vertex AI
* Cloud Storage

Learn about [Vertex AI
pricing](https://cloud.google.com/vertex-ai/pricing) and [Cloud Storage
pricing](https://cloud.google.com/storage/pricing), and use the [Pricing
Calculator](https://cloud.google.com/products/calculator/)
to generate a cost estimate based on your projected usage.

## Get started

### Install Vertex AI SDK for Python and other required packages

In [None]:
!pip3 install --upgrade --quiet google-cloud-pipeline-components==1.0.45 \
                                google-cloud-aiplatform

### Restart runtime (Colab only)

To use the newly installed packages, you must restart the runtime on Google Colab.

In [None]:
import sys

if "google.colab" in sys.modules:

    import IPython

    app = IPython.Application.instance()
    app.kernel.do_shutdown(True)

<div class="alert alert-block alert-warning">
<b>⚠️ The kernel is going to restart. Wait until it's finished before continuing to the next step. ⚠️</b>
</div>

### Authenticate your notebook environment (Colab only)

Authenticate your environment on Google Colab.

In [None]:
import sys

if "google.colab" in sys.modules:

    from google.colab import auth

    auth.authenticate_user()

### Set Google Cloud project information

To get started using Vertex AI, you must have an existing Google Cloud project and [enable the Vertex AI API](https://console.cloud.google.com/flows/enableapi?apiid=aiplatform.googleapis.com). Learn more about [setting up a project and a development environment](https://cloud.google.com/vertex-ai/docs/start/cloud-environment).

In [None]:
PROJECT_ID = "[your-project-id]"  # @param {type:"string"}
LOCATION = "us-central1"  # @param {type: "string"}

### Create a Cloud Storage bucket

Create a storage bucket to store intermediate artifacts such as datasets.

In [None]:
BUCKET_URI = f"gs://your-bucket-name-{PROJECT_ID}-unique"  # @param {type:"string"}

**Only if your bucket doesn't already exist**: Run the following cell to create your Cloud Storage bucket.

In [None]:
! gsutil mb -l $LOCATION $BUCKET_URI

#### Service Account

You use a service account to create Vertex AI Pipeline jobs. If you don't want to use your project's Compute Engine service account, set `SERVICE_ACCOUNT` to another service account ID.

In [None]:
SERVICE_ACCOUNT = "[your-service-account]"

In [None]:
import sys

IS_COLAB = "google.colab" in sys.modules
if (
    SERVICE_ACCOUNT == ""
    or SERVICE_ACCOUNT is None
    or SERVICE_ACCOUNT == "[your-service-account]"
):
    # Get your service account from gcloud
    if not IS_COLAB:
        shell_output = !gcloud auth list 2>/dev/null
        SERVICE_ACCOUNT = shell_output[2].replace("*", "").strip()

    else:  # IS_COLAB:
        shell_output = ! gcloud projects describe  $PROJECT_ID
        project_number = shell_output[-1].split(":")[1].strip().replace("'", "")
        SERVICE_ACCOUNT = f"{project_number}-compute@developer.gserviceaccount.com"

    print("Service Account:", SERVICE_ACCOUNT)

#### Set service account access for Vertex AI Pipelines
Run the following commands to grant your service account access to read and write pipeline artifacts in the bucket that you created in the previous step. You only need to run this step once per service account.

In [None]:
! gsutil iam ch serviceAccount:{SERVICE_ACCOUNT}:roles/storage.objectCreator $BUCKET_URI

! gsutil iam ch serviceAccount:{SERVICE_ACCOUNT}:roles/storage.objectViewer $BUCKET_URI

## Import libraries and define constants

In [None]:
import json
# Import required modules
import os
from typing import Any, Dict, List

from google.cloud import aiplatform, storage
from google_cloud_pipeline_components.experimental.automl.tabular import \
    utils as automl_tabular_utils

## Initialize Vertex AI SDK for Python

Initialize the Vertex AI SDK for Python for your project.

In [None]:
aiplatform.init(project=PROJECT_ID, location=LOCATION)

### Define helper functions

In [None]:
def get_bucket_name_and_path(uri):
    no_prefix_uri = uri[len("gs://") :]
    splits = no_prefix_uri.split("/")
    return splits[0], "/".join(splits[1:])


def download_from_gcs(uri):
    bucket_name, path = get_bucket_name_and_path(uri)
    storage_client = storage.Client(project=PROJECT_ID)
    bucket = storage_client.get_bucket(bucket_name)
    blob = bucket.blob(path)
    return blob.download_as_string()


def write_to_gcs(uri: str, content: str):
    bucket_name, path = get_bucket_name_and_path(uri)
    storage_client = storage.Client()
    bucket = storage_client.get_bucket(bucket_name)
    blob = bucket.blob(path)
    blob.upload_from_string(content)


def generate_auto_transformation(column_names: List[str]) -> List[Dict[str, Any]]:
    transformations = []
    for column_name in column_names:
        transformations.append({"auto": {"column_name": column_name}})
    return transformations


def write_auto_transformations(uri: str, column_names: List[str]):
    transformations = generate_auto_transformation(column_names)
    write_to_gcs(uri, json.dumps(transformations))


def get_task_detail(
    task_details: List[Dict[str, Any]], task_name: str
) -> List[Dict[str, Any]]:
    for task_detail in task_details:
        if task_detail.task_name == task_name:
            return task_detail


def get_deployed_model_uri(
    task_details,
):
    ensemble_task = get_task_detail(task_details, "model-upload")
    return ensemble_task.outputs["model"].artifacts[0].uri


def get_no_custom_ops_model_uri(task_details):
    ensemble_task = get_task_detail(task_details, "automl-tabular-ensemble")
    return download_from_gcs(
        ensemble_task.outputs["model_without_custom_ops"].artifacts[0].uri
    )


def get_feature_attributions(
    task_details,
):
    ensemble_task = get_task_detail(task_details, "feature-attribution-2")
    return download_from_gcs(
        ensemble_task.outputs["feature_attributions"].artifacts[0].uri
    )


def get_evaluation_metrics(
    task_details,
):
    ensemble_task = get_task_detail(task_details, "model-evaluation-2")
    return download_from_gcs(
        ensemble_task.outputs["evaluation_metrics"].artifacts[0].uri
    )


def load_and_print_json(s):
    parsed = json.loads(s)
    print(json.dumps(parsed, indent=2, sort_keys=True))

### Define training specification

In [None]:
run_evaluation = True  # @param {type:"boolean"}
run_distillation = False  # @param {type:"boolean"}
root_dir = os.path.join(BUCKET_URI, "automl_tabular_pipeline")
prediction_type = "classification"
optimization_objective = "minimize-log-loss"
target_column = "deposit"
data_source_csv_filenames = "gs://cloud-samples-data/vertex-ai/tabular-workflows/datasets/bank-marketing/train.csv"
data_source_bigquery_table_path = None  # format: bq://bq_project.bq_dataset.bq_table

timestamp_split_key = None  # timestamp column name when using timestamp split
stratified_split_key = None  # target column name when using stratified split
training_fraction = 0.8
validation_fraction = 0.1
test_fraction = 0.1

predefined_split_key = None
if predefined_split_key:
    training_fraction = None
    validation_fraction = None
    test_fraction = None

weight_column = None

features = [
    "age",
    "job",
    "marital",
    "education",
    "default",
    "balance",
    "housing",
    "loan",
    "contact",
    "day",
    "month",
    "duration",
    "campaign",
    "pdays",
    "previous",
    "poutcome",
]
transformations = generate_auto_transformation(features)
transform_config_path = os.path.join(root_dir, "transform_config_unique.json")
write_to_gcs(transform_config_path, json.dumps(transformations))

## VPC related config

If you need to use a custom Dataflow subnetwork, you can set it through the `dataflow_subnetwork` parameter. The requirements are:
1. `dataflow_subnetwork` must be fully qualified subnetwork name.
   [[reference](https://cloud.google.com/dataflow/docs/guides/specifying-networks#example_network_and_subnetwork_specifications)]
1. The following service accounts must have [Compute Network User role](https://cloud.google.com/compute/docs/access/iam#compute.networkUser) assigned on the specified dataflow subnetwork [[reference](https://cloud.google.com/dataflow/docs/guides/specifying-networks#shared)]:
    1. Compute Engine default service account: PROJECT_NUMBER-compute@developer.gserviceaccount.com
    1. Dataflow service account: service-PROJECT_NUMBER@dataflow-service-producer-prod.iam.gserviceaccount.com

If your project has VPC-SC enabled, please make sure:

1. The dataflow subnetwork used in VPC-SC is configured properly for Dataflow.
   [[reference](https://cloud.google.com/dataflow/docs/guides/routes-firewall)]
1. `dataflow_use_public_ips` is set to False.


In [None]:
# Dataflow's fully qualified subnetwork name, when empty the default subnetwork will be used.
# Fully qualified subnetwork name is in the form of
# https://www.googleapis.com/compute/v1/projects/HOST_PROJECT_ID/regions/REGION_NAME/subnetworks/SUBNETWORK_NAME
# reference: https://cloud.google.com/dataflow/docs/guides/specifying-networks#example_network_and_subnetwork_specifications
dataflow_subnetwork = None  # @param {type:"string"}
# Specifies whether Dataflow workers use public IP addresses.
dataflow_use_public_ips = True  # @param {type:"boolean"}

## Customize search space and change training configuration

You create a skip evaluation AutoML Tables pipeline with the following customizations:
- Limit the hyperparameter search space
- Change machine type and tuning / training parallelism

In [None]:
study_spec_parameters_override = [
    {
        "parameter_id": "model_type",
        "categorical_value_spec": {
            "values": [
                "nn"
            ]  # The default value is ["nn", "boosted_trees"], this reduces the search space
        },
    }
]

worker_pool_specs_override = [
    {"machine_spec": {"machine_type": "n1-standard-8"}},  # override for TF chief node
    {},  # override for TF worker node, since it's not used, leave it empty
    {},  # override for TF ps node, since it's not used, leave it empty
    {
        "machine_spec": {
            "machine_type": "n1-standard-4"  # override for TF evaluator node
        }
    },
]

# Number of weak models in the final ensemble model is
# stage_2_num_selected_trials * 5. If unspecified, 5 is the default value for
# stage_2_num_selected_trials.
stage_2_num_selected_trials = 5

# The pipeline output a TF saved model contains the following TF custom op:
# - https://github.com/google/struct2tensor
#
# There are a few ways to run the model:
# - Official prediction server docker image
#   Please follow the "Run the model server" section in
#   https://cloud.google.com/vertex-ai/docs/export/export-model-tabular#run-server
# - Python or cpp runtimes like TF serving
#   Please set export_additional_model_without_custom_ops so the pipeline
#   outputs an additional model does does not depend on struct2tensor.
#   - `get_no_custom_ops_model_uri` shows how to get the model artifact URI.
#   - The input to the model is a dictionary of feature name to tensor. Use
#     `saved_model_cli show --dir {saved_model.pb's path} --signature_def serving_default --tag serve`
#     to find out more details.
export_additional_model_without_custom_ops = False

train_budget_milli_node_hours = 1000  # 1 hour

(
    template_path,
    parameter_values,
) = automl_tabular_utils.get_automl_tabular_pipeline_and_parameters(
    PROJECT_ID,
    LOCATION,
    root_dir,
    target_column,
    prediction_type,
    optimization_objective,
    transform_config_path,
    train_budget_milli_node_hours,
    data_source_csv_filenames=data_source_csv_filenames,
    data_source_bigquery_table_path=data_source_bigquery_table_path,
    weight_column=weight_column,
    predefined_split_key=predefined_split_key,
    timestamp_split_key=timestamp_split_key,
    stratified_split_key=stratified_split_key,
    training_fraction=training_fraction,
    validation_fraction=validation_fraction,
    test_fraction=test_fraction,
    study_spec_parameters_override=study_spec_parameters_override,
    stage_1_tuner_worker_pool_specs_override=worker_pool_specs_override,
    cv_trainer_worker_pool_specs_override=worker_pool_specs_override,
    run_evaluation=run_evaluation,
    run_distillation=run_distillation,
    dataflow_subnetwork=dataflow_subnetwork,
    dataflow_use_public_ips=dataflow_use_public_ips,
    export_additional_model_without_custom_ops=export_additional_model_without_custom_ops,
)

job_id = "automl-tabular-unique"
job = aiplatform.PipelineJob(
    display_name=job_id,
    location=LOCATION,  # launches the pipeline job in the specified location
    template_path=template_path,
    job_id=job_id,
    pipeline_root=root_dir,
    parameter_values=parameter_values,
    enable_caching=False,
)

job.run()


pipeline_task_details = job.gca_resource.job_detail.task_details

if export_additional_model_without_custom_ops:
    print(
        "trained model without custom TF ops:",
        get_no_custom_ops_model_uri(pipeline_task_details),
    )

if run_evaluation:
    print("evaluation metrics:")
    load_and_print_json(get_evaluation_metrics(pipeline_task_details))

    print("feature attributions:")
    load_and_print_json(get_feature_attributions(pipeline_task_details))

automl_tabular_pipeline_job_name = job_id

## Skip architecture search
Instead of doing architecture search everytime, you can reuse the existing architecture search result. This could help:
1. reducing the variation of the output model
2. reducing training cost

The existing architecture search result is stored in the `tuning_result_output` output of the `automl-tabular-stage-1-tuner` component. You can manually input it or get it programmatically.

In [None]:
stage_1_tuner_task = get_task_detail(
    pipeline_task_details, "automl-tabular-stage-1-tuner"
)

stage_1_tuning_result_artifact_uri = (
    stage_1_tuner_task.outputs["tuning_result_output"].artifacts[0].uri
)

### Run the skip architecture search pipeline


In [None]:
(
    template_path,
    parameter_values,
) = automl_tabular_utils.get_skip_architecture_search_pipeline_and_parameters(
    PROJECT_ID,
    LOCATION,
    root_dir,
    target_column,
    prediction_type,
    optimization_objective,
    transform_config_path,
    train_budget_milli_node_hours,
    data_source_csv_filenames=data_source_csv_filenames,
    data_source_bigquery_table_path=data_source_bigquery_table_path,
    weight_column=weight_column,
    predefined_split_key=predefined_split_key,
    timestamp_split_key=timestamp_split_key,
    stratified_split_key=stratified_split_key,
    training_fraction=training_fraction,
    validation_fraction=validation_fraction,
    test_fraction=test_fraction,
    stage_1_tuning_result_artifact_uri=stage_1_tuning_result_artifact_uri,
    run_evaluation=run_evaluation,
    dataflow_subnetwork=dataflow_subnetwork,
    dataflow_use_public_ips=dataflow_use_public_ips,
)

job_id = "automl-tabular-skip-architecture-search-unique"
job = aiplatform.PipelineJob(
    display_name=job_id,
    location=LOCATION,  # launches the pipeline job in the specified location
    template_path=template_path,
    job_id=job_id,
    pipeline_root=root_dir,
    parameter_values=parameter_values,
    enable_caching=False,
)

job.run()

# Get model URI
skip_architecture_search_pipeline_task_details = (
    job.gca_resource.job_detail.task_details
)

if export_additional_model_without_custom_ops:
    print(
        "trained model without custom TF ops:",
        get_no_custom_ops_model_uri(pipeline_task_details),
    )

automl_tabular_skip_architecture_search_pipeline_job_name = job_id

## Clean up Vertex and BigQuery resources

To clean up all Google Cloud resources used in this project, you can [delete the Google Cloud
project](https://cloud.google.com/resource-manager/docs/creating-managing-projects#shutting_down_projects) you used for the tutorial.

Otherwise, you can delete the individual resources you created in this tutorial:

- Cloud Storage Bucket

In [None]:
def get_task_detail(
    task_details: List[Dict[str, Any]], task_name: str
) -> List[Dict[str, Any]]:
    for task_detail in task_details:
        if task_detail.task_name == task_name:
            return task_detail

In [None]:
# Get the automl tabular training pipeline object
automl_tabular_pipeline_job = aiplatform.PipelineJob.get(
    f"projects/{PROJECT_ID}/locations/{LOCATION}/pipelineJobs/{automl_tabular_pipeline_job_name}"
)

# fetch automl tabular training pipeline task details
pipeline_task_details = automl_tabular_pipeline_job.gca_resource.job_detail.task_details

# fetch model from automl tabular training pipeline and delete the model
model_task = get_task_detail(pipeline_task_details, "model-upload-2")
model_resourceName = model_task.outputs["model"].artifacts[0].metadata["resourceName"]
model = aiplatform.Model(model_resourceName)
model.delete()

# Delete the automl tabular pipeline
automl_tabular_pipeline_job.delete()

# Get the automl tabular skip architecture search pipeline object
automl_tabular_skip_architecture_search_pipeline_job = aiplatform.PipelineJob.get(
    f"projects/{PROJECT_ID}/locations/{LOCATION}/pipelineJobs/{automl_tabular_skip_architecture_search_pipeline_job_name}"
)

# fetch automl tabular skip architecture search pipeline task details
pipeline_task_details = (
    automl_tabular_skip_architecture_search_pipeline_job.gca_resource.job_detail.task_details
)

# fetch model from automl tabular skip architecture search pipeline and delete the model
model_task = get_task_detail(pipeline_task_details, "model-upload")
model_resourceName = model_task.outputs["model"].artifacts[0].metadata["resourceName"]
model = aiplatform.Model(model_resourceName)
model.delete()

# Delete the automl tabular skip architecture search pipeline
automl_tabular_skip_architecture_search_pipeline_job.delete()

# Delete Cloud Storage objects that were created
delete_bucket = False  # Set True for deletion
if delete_bucket:
    ! gsutil -m rm -r $BUCKET_URI