In [None]:
# Copyright 2021 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

# Vertex AI Pipelines: Custom training with pre-built Google Cloud Pipeline Components

<table align="left">
  <td style="text-align: center">
    <a href="https://colab.research.google.com/github/GoogleCloudPlatform/vertex-ai-samples/blob/main/notebooks/official/pipelines/custom_model_training_and_batch_prediction.ipynb">
      <img src="https://cloud.google.com/ml-engine/images/colab-logo-32px.png" alt="Google Colaboratory logo"><br> Open in Colab
    </a>
  </td>
  <td style="text-align: center">
    <a href="https://console.cloud.google.com/vertex-ai/colab/import/https:%2F%2Fraw.githubusercontent.com%2FGoogleCloudPlatform%2Fvertex-ai-samples%2Fmain%2Fnotebooks%2Fofficial%2Fpipelines%2Fcustom_model_training_and_batch_prediction.ipynb">
      <img width="32px" src="https://cloud.google.com/ml-engine/images/colab-enterprise-logo-32px.png" alt="Google Cloud Colab Enterprise logo"><br> Open in Colab Enterprise
    </a>
  </td>    
  <td style="text-align: center">
    <a href="https://console.cloud.google.com/vertex-ai/workbench/deploy-notebook?download_url=https://raw.githubusercontent.com/GoogleCloudPlatform/vertex-ai-samples/main/notebooks/official/pipelines/custom_model_training_and_batch_prediction.ipynb">
      <img src="https://lh3.googleusercontent.com/UiNooY4LUgW_oTvpsNhPpQzsstV5W8F7rYgxgGBD85cWJoLmrOzhVs_ksK_vgx40SHs7jCqkTkCk=e14-rj-sc0xffffff-h130-w32" alt="Vertex AI logo"><br> Open in Workbench
    </a>
  </td>
  <td style="text-align: center">
    <a href="https://github.com/GoogleCloudPlatform/vertex-ai-samples/blob/main/notebooks/official/pipelines/custom_model_training_and_batch_prediction.ipynb">
      <img src="https://cloud.google.com/ml-engine/images/github-logo-32px.png" alt="GitHub logo"><br> View on GitHub
    </a>
  </td>
</table>

## Overview


This tutorial demonstrates how to use Vertex AI Pipelines with pre-built components from Google Cloud Pipeline Components for custom training.

Learn more about [Vertex AI Pipelines](https://cloud.google.com/vertex-ai/docs/pipelines/introduction) and [Custom training components](https://cloud.google.com/vertex-ai/docs/training/create-training-pipeline).

### Objective

In this tutorial, you learn to use Vertex AI Pipelines and Google Cloud Pipeline Components to build a custom model.


This tutorial uses the following Vertex AI services:

- Vertex AI Pipelines
- Google Cloud Pipeline Components
- Vertex AI Training
- Vertex AI model resource
- Vertex AI endpoint resource

The steps performed include:

- Create a KFP pipeline:
    - Train a custom model.
    - Upload the trained model as a model resource.
    - Create an endpoint resource.
    - Deploy the model resource to the endpoint resource.
    - Make a batch prediction request.

Learn more about [Google Cloud Pipeline Components](https://cloud.google.com/vertex-ai/docs/pipelines/build-pipeline).

### Dataset

The dataset used for this tutorial is the [CIFAR10 dataset](https://www.tensorflow.org/datasets/catalog/cifar10) from [TensorFlow Datasets](https://www.tensorflow.org/datasets/catalog/overview). The version of the dataset you'll use is built into TensorFlow. The trained model predicts which type of class an image is from ten classes: airplane, automobile, bird, cat, deer, dog, frog, horse, ship, or truck.

### Costs

This tutorial uses billable components of Google Cloud:

* Vertex AI
* Cloud Storage

Learn about [Vertex AI
pricing](https://cloud.google.com/vertex-ai/pricing) and [Cloud Storage
pricing](https://cloud.google.com/storage/pricing), and use the [Pricing
Calculator](https://cloud.google.com/products/calculator/)
to generate a cost estimate based on your projected usage.

## Get Started

### Install Vertex AI SDK for Python and other required packages

In [None]:
! pip3 install --upgrade --quiet google-cloud-aiplatform \
                                 google-cloud-storage \
                                 kfp \
                                 google-cloud-pipeline-components


! pip3 install --upgrade --force-reinstall tensorflow kfp google-cloud-aiplatform google-cloud-storage google-cloud-pipeline-components -q

### Restart runtime (Colab only)

To use the newly installed packages, you must restart the runtime on Google Colab.

In [None]:
import sys

if "google.colab" in sys.modules:

    import IPython

    app = IPython.Application.instance()
    app.kernel.do_shutdown(True)

<div class="alert alert-block alert-warning">
<b>⚠️ The kernel is going to restart. Wait until it's finished before continuing to the next step. ⚠️</b>
</div>

### Authenticate your notebook environment (Colab only)

Authenticate your environment on Google Colab.

In [None]:
import sys

if "google.colab" in sys.modules:

    from google.colab import auth

    auth.authenticate_user()

### Set Google Cloud project information

To get started using Vertex AI, you must have an existing Google Cloud project. Learn more about [setting up a project and a development environment](https://cloud.google.com/vertex-ai/docs/start/cloud-environment).

In [None]:
PROJECT_ID = "[your-project-id]"  # @param {type:"string"}
LOCATION = "us-central1"  # @param {type: "string"}

### Create a Cloud Storage bucket

Create a storage bucket to store intermediate artifacts such as datasets.

In [None]:
BUCKET_URI = f"gs://your-bucket-name-{PROJECT_ID}-unique"  # @param {type:"string"}

**If your bucket doesn't already exist**: Run the following cell to create your Cloud Storage bucket.

In [None]:
! gsutil mb -l {LOCATION} -p {PROJECT_ID} {BUCKET_URI}

#### Service Account

**If you don't know your service account**, try to get your service account using `gcloud` command by executing the second cell below.

In [None]:
SERVICE_ACCOUNT = "[your-service-account]"  # @param {type:"string"}

In [None]:
import sys

IS_COLAB = "google.colab" in sys.modules
if (
    SERVICE_ACCOUNT == ""
    or SERVICE_ACCOUNT is None
    or SERVICE_ACCOUNT == "[your-service-account]"
):
    # Get your service account from gcloud
    if not IS_COLAB:
        shell_output = !gcloud auth list 2>/dev/null
        SERVICE_ACCOUNT = shell_output[2].replace("*", "").strip()

    if IS_COLAB:
        shell_output = ! gcloud projects describe  $PROJECT_ID
        project_number = shell_output[-1].split(":")[1].strip().replace("'", "")
        SERVICE_ACCOUNT = f"{project_number}-compute@developer.gserviceaccount.com"

    print("Service Account:", SERVICE_ACCOUNT)

#### Set service account access for Vertex AI Pipelines

Run the following commands to grant your service account access to read and write pipeline artifacts in the bucket that you created in the previous step. You only need to run these once per service account.

In [None]:
! gsutil iam ch serviceAccount:{SERVICE_ACCOUNT}:roles/storage.objectCreator $BUCKET_URI

! gsutil iam ch serviceAccount:{SERVICE_ACCOUNT}:roles/storage.objectViewer $BUCKET_URI

### Import libraries and define constants

In [None]:
import json

import google.cloud.aiplatform as aiplatform
import tensorflow as tf
from google_cloud_pipeline_components.v1.custom_job import utils
from kfp import compiler, dsl
from kfp.dsl import component

#### Vertex AI Pipelines constants

Setup up the following constants for Vertex AI Pipelines:

- `PIPELINE_ROOT` : Root folder to store pipeline artifacts in Cloud Storage bucket.

In [None]:
PIPELINE_ROOT = "{}/pipeline_root/bikes_weather".format(BUCKET_URI)

### Initialize Vertex AI SDK for Python

Initialize the Vertex AI SDK for Python for your project and corresponding bucket.

In [None]:
aiplatform.init(project=PROJECT_ID, location=LOCATION, staging_bucket=BUCKET_URI)

### Set hardware accelerators

You can set hardware accelerators for training and prediction.

Set the variables `TRAIN_GPU/TRAIN_NGPU` and `DEPLOY_GPU/DEPLOY_NGPU` to use a container image supporting a GPU and the number of GPUs allocated to the virtual machine (VM) instance. For example, to use a GPU container image with 4 Nvidia Tesla T4 GPUs allocated to each VM, you'd specify:

    (aiplatform.gapic.AcceleratorType.NVIDIA_TESLA_T4, 4)


Otherwise specify `(None, None)` to use a container image to run on a CPU.

Learn more about [ hardware accelerator support for your region](https://cloud.google.com/vertex-ai/docs/general/locations#accelerators).

**Note**: TF releases before 2.3 for GPU support are expected to fail to load the custom model in this tutorial. It's a known issue and is fixed in TF 2.3. This is caused by static graph ops that are generated in the serving function. If you encounter this issue on your own custom models, use a container image for TF 2.3 with GPU support.

In [None]:
TRAIN_GPU, TRAIN_NGPU = (None, None)

DEPLOY_GPU, DEPLOY_NGPU = (None, None)

### Set pre-built containers

Set the pre-built Docker container image for training and prediction.


For the latest list, see [Pre-built containers for training](https://cloud.google.com/ai-platform-unified/docs/training/pre-built-containers).


For the latest list, see [Pre-built containers for prediction](https://cloud.google.com/ai-platform-unified/docs/predictions/pre-built-containers).

In [None]:
TF = "2-13"

if TRAIN_GPU:
    TRAIN_VERSION = "tf-gpu.{}".format(TF)
else:
    TRAIN_VERSION = "tf-cpu.{}".format(TF)
if DEPLOY_GPU:
    DEPLOY_VERSION = "tf2-gpu.{}".format(TF)
else:
    DEPLOY_VERSION = "tf2-cpu.{}".format(TF)

TRAIN_IMAGE = "gcr.io/vertex-ai/training/{}:latest".format(TRAIN_VERSION)
DEPLOY_IMAGE = "gcr.io/vertex-ai/prediction/{}:latest".format(DEPLOY_VERSION)

print("Training:", TRAIN_IMAGE, TRAIN_GPU, TRAIN_NGPU)
print("Deployment:", DEPLOY_IMAGE, DEPLOY_GPU, DEPLOY_NGPU)

### Set machine type

Next, set the machine type to use for training and prediction.

- Set the variables `TRAIN_COMPUTE` and `DEPLOY_COMPUTE` to configure  the compute resources for the VMs you'll use for for training and prediction.
 - `machine type`
     - `n1-standard`: 3.75GB of memory per vCPU.
     - `n1-highmem`: 6.5GB of memory per vCPU
     - `n1-highcpu`: 0.9 GB of memory per vCPU
 - `vCPUs`: number of \[2, 4, 8, 16, 32, 64, 96 \]

**Note**: The following isn't supported for training:

 - `standard`: 2 vCPUs
 - `highcpu`: 2, 4 and 8 vCPUs

**Note**: You may also use n2 and e2 machine types for training and deployment, but they don't support GPUs.

In [None]:
MACHINE_TYPE = "n1-standard"

VCPU = "4"
TRAIN_COMPUTE = MACHINE_TYPE + "-" + VCPU
print("Train machine type", TRAIN_COMPUTE)

VCPU = "4"
DEPLOY_COMPUTE = MACHINE_TYPE + "-" + VCPU
print("Deploy machine type", DEPLOY_COMPUTE)

## Tutorial

Now you're ready to start training on CIFAR10 and create your own custom model.

### Examine the training package

#### Package layout

Before you start the training, you look at how a Python package is assembled for a custom training job. When unarchived, the package contains the following directory/file layout.

- PKG-INFO
- README.md
- setup.cfg
- setup.py
- trainer
  - \_\_init\_\_.py
  - task.py

The files `setup.cfg` and `setup.py` are the instructions for installing the package into the operating environment of the Docker image.

The file `trainer/task.py` is the Python script for executing the custom training job. **Note**: When referred to the file in the worker pool specification, the file suffix (`.py`) is dropped and the directory slash is replaced with a dot (`trainer.task`).

#### Package Assembly

In the following cells, you assemble the training package.

In [None]:
# Make folder for Python training script
! rm -rf custom
! mkdir custom

# Add package information
! touch custom/README.md

setup_cfg = "[egg_info]\n\ntag_build =\n\ntag_date = 0"
! echo "$setup_cfg" > custom/setup.cfg

setup_py = "import setuptools\n\nsetuptools.setup(\n\n    install_requires=[\n\n        'tensorflow_datasets==1.3.0',\n\n    ],\n\n    packages=setuptools.find_packages())"
! echo "$setup_py" > custom/setup.py

pkg_info = "Metadata-Version: 1.0\n\nName: CIFAR10 image classification\n\nVersion: 0.0.0\n\nSummary: Demostration training script\n\nHome-page: www.google.com\n\nAuthor: Google\n\nAuthor-email: aferlitsch@google.com\n\nLicense: Public\n\nDescription: Demo\n\nPlatform: Vertex"
! echo "$pkg_info" > custom/PKG-INFO

# Make the training subfolder
! mkdir custom/trainer
! touch custom/trainer/__init__.py

### Create a custom component for training the custom model

Next, create a lightweight Python function component for training the CIFAR10 image classification model.

In [None]:
# Single, Mirror and Multi-Machine Distributed Training for CIFAR-10


@component(
    base_image="tensorflow/tensorflow:2.13.0",
    packages_to_install=["tensorflow_datasets", "opencv-python-headless"],
)
def custom_train_model(
    model_dir: str,
    lr: float = 0.01,
    epochs: int = 10,
    steps: int = 200,
    distribute: str = "single",
):

    import faulthandler
    import os
    import sys

    import tensorflow as tf
    import tensorflow_datasets as tfds
    from tensorflow import keras
    from tensorflow.python.client import device_lib

    faulthandler.enable()
    tfds.disable_progress_bar()

    print("Component start")

    print("Python Version = {}".format(sys.version))
    print("TensorFlow Version = {}".format(tf.__version__))
    print("TF_CONFIG = {}".format(os.environ.get("TF_CONFIG", "Not found")))
    print("DEVICES", device_lib.list_local_devices())

    # Single Machine, single compute device
    if distribute == "single":
        if tf.test.is_gpu_available():
            strategy = tf.distribute.OneDeviceStrategy(device="/gpu:0")
        else:
            strategy = tf.distribute.OneDeviceStrategy(device="/cpu:0")
    # Single Machine, multiple compute device
    elif distribute == "mirror":
        strategy = tf.distribute.MirroredStrategy()
    # Multiple Machine, multiple compute device
    elif distribute == "multi":
        strategy = tf.distribute.experimental.MultiWorkerMirroredStrategy()

    # Multi-worker configuration
    print("num_replicas_in_sync = {}".format(strategy.num_replicas_in_sync))

    # Preparing dataset
    BUFFER_SIZE = 10000
    BATCH_SIZE = 64

    def make_datasets_unbatched():

        # Scaling CIFAR10 data from (0, 255] to (0., 1.]
        def scale(image, label):
            image = tf.cast(image, tf.float32)
            image /= 255.0
            return image, label

        datasets, info = tfds.load(name="cifar10", with_info=True, as_supervised=True)
        return datasets["train"].map(scale).cache().shuffle(BUFFER_SIZE).repeat()

    # Build the Keras model
    def build_and_compile_cnn_model(lr: int = 0.01):
        model = keras.Sequential(
            [
                keras.layers.Conv2D(32, 3, activation="relu", input_shape=(32, 32, 3)),
                keras.layers.MaxPooling2D(),
                keras.layers.Conv2D(32, 3, activation="relu"),
                keras.layers.MaxPooling2D(),
                keras.layers.Flatten(),
                keras.layers.Dense(10, activation="softmax"),
            ]
        )
        model.compile(
            loss=keras.losses.sparse_categorical_crossentropy,
            optimizer=keras.optimizers.SGD(learning_rate=lr),
            metrics=["accuracy"],
        )
        return model

    # Train the model
    NUM_WORKERS = strategy.num_replicas_in_sync
    # Here the batch size scales up by number of workers since
    # `tf.data.Dataset.batch` expects the global batch size.
    GLOBAL_BATCH_SIZE = BATCH_SIZE * NUM_WORKERS
    train_dataset = make_datasets_unbatched().batch(GLOBAL_BATCH_SIZE)

    with strategy.scope():
        # Creation of dataset, and model building/compiling need to be within
        # `strategy.scope()`.
        model = build_and_compile_cnn_model(lr)

    model.fit(x=train_dataset, epochs=epochs, steps_per_epoch=steps)

    # Save the model
    model.save(model_dir + ".keras")

    model_path_to_deploy = model_dir

    # Load the saved model
    local_model = keras.models.load_model(model_dir + ".keras")

    # Load evaluation data
    import numpy as np
    from tensorflow.keras.datasets import cifar10

    (_, _), (x_test, y_test) = cifar10.load_data()
    x_test = (x_test / 255.0).astype(np.float32)

    print(x_test.shape, y_test.shape)

    # Perform the model evaluation
    local_model.evaluate(x_test, y_test)

    # Serving function for image data
    CONCRETE_INPUT = "numpy_inputs"

    def _preprocess(bytes_input):
        decoded = tf.io.decode_jpeg(bytes_input, channels=3)
        decoded = tf.image.convert_image_dtype(decoded, tf.float32)
        resized = tf.image.resize(decoded, size=(32, 32))
        return resized

    @tf.function(input_signature=[tf.TensorSpec([None], tf.string)])
    def preprocess_fn(bytes_inputs):
        decoded_images = tf.map_fn(
            _preprocess, bytes_inputs, dtype=tf.float32, back_prop=False
        )
        return {
            CONCRETE_INPUT: decoded_images
        }  # User needs to make sure the key matches model's input

    @tf.function(input_signature=[tf.TensorSpec([None], tf.string)])
    def serving_fn(bytes_inputs):
        images = preprocess_fn(bytes_inputs)
        prob = m_call(**images)
        return prob

    m_call = tf.function(local_model.call).get_concrete_function(
        [tf.TensorSpec(shape=[None, 32, 32, 3], dtype=tf.float32, name=CONCRETE_INPUT)]
    )

    tf.saved_model.save(
        local_model, model_path_to_deploy, signatures={"serving_default": serving_fn}
    )

    # Get the serving function signature
    loaded = tf.saved_model.load(model_path_to_deploy)

    serving_input = list(
        loaded.signatures["serving_default"].structured_input_signature[1].keys()
    )[0]
    print("Serving function input:", serving_input)

    # Get test items
    test_image_1 = x_test[0]
    test_image_2 = x_test[1]
    print(test_image_1.shape)

    BUCKET_URI = model_dir + "/test"

    import cv2

    cv2.imwrite("tmp1.jpg", (test_image_1 * 255).astype(np.uint8))
    cv2.imwrite("tmp2.jpg", (test_image_2 * 255).astype(np.uint8))

    print("Writing jpg files")

    # Copy test item(s)
    # For the batch prediction, copy the test items over to your Cloud Storage bucket.
    test_item_1 = BUCKET_URI + "/tmp1.jpg"
    test_item_2 = BUCKET_URI + "/tmp2.jpg"

    with tf.io.gfile.GFile(test_item_1, "wb") as w:
        with tf.io.gfile.GFile("tmp1.jpg", "rb") as r:
            bytes = r.read()
            w.write(bytes)

    with tf.io.gfile.GFile(test_item_2, "wb") as w:
        with tf.io.gfile.GFile("tmp2.jpg", "rb") as r:
            bytes = r.read()
            w.write(bytes)

    # Make the batch input file
    import base64
    import json

    gcs_input_uri = BUCKET_URI + "/" + "test.jsonl"
    with tf.io.gfile.GFile(gcs_input_uri, "w") as f:
        bytes = tf.io.read_file(test_item_1)
        b64str = base64.b64encode(bytes.numpy()).decode("utf-8")
        data = {serving_input: {"b64": b64str}}
        f.write(json.dumps(data) + "\n")
        bytes = tf.io.read_file(test_item_2)
        b64str = base64.b64encode(bytes.numpy()).decode("utf-8")
        data = {serving_input: {"b64": b64str}}
        f.write(json.dumps(data) + "\n")

### Convert the component to a Vertex AI Custom Job

Next, use the `create_custom_training_job_op_from_component` method to convert the custom component into a Vertex AI custom job pre-built component.

**replica_count :**  The  number of machine replicas the batch operation may be scaled to. Only used if **machine_type** is set. Default is 10.

In [None]:
custom_job_distributed_training_op = utils.create_custom_training_job_op_from_component(
    custom_train_model, replica_count=1
)

### Define the pipeline for the custom training job

Next, define the pipeline job that runs the following tasks:

- Trains the custom model.
- Uploads the model to Verex AI Model Registry.
- Execute a batch prediction job.

In [None]:
MODEL_DIR = BUCKET_URI + "/model"


@dsl.pipeline(name="custom-model-training-sample-pipeline")
def pipeline(
    model_dir: str = MODEL_DIR,
    lr: float = 0.01,
    epochs: int = 10,
    steps: int = 200,
    distribute: str = "single",
):
    from google_cloud_pipeline_components.types import artifact_types
    from google_cloud_pipeline_components.v1.batch_predict_job import \
        ModelBatchPredictOp
    from google_cloud_pipeline_components.v1.model import ModelUploadOp
    from kfp.dsl import importer_node

    custom_producer_task = custom_job_distributed_training_op(
        model_dir=model_dir,
        lr=lr,
        epochs=epochs,
        steps=steps,
        distribute=distribute,
        project=PROJECT_ID,
        location=LOCATION,
        base_output_directory=PIPELINE_ROOT,
    )

    unmanaged_model_importer = importer_node.importer(
        artifact_uri=model_dir,
        artifact_class=artifact_types.UnmanagedContainerModel,
        metadata={
            "containerSpec": {
                "imageUri": "us-docker.pkg.dev/vertex-ai/prediction/tf2-cpu.2-13:latest"
            }
        },
    )

    model_upload_op = ModelUploadOp(
        project=PROJECT_ID,
        display_name="model_display_name",
        unmanaged_container_model=unmanaged_model_importer.outputs["artifact"],
    )
    model_upload_op.after(custom_producer_task)

    batch_predict_op = ModelBatchPredictOp(
        project=PROJECT_ID,
        job_display_name="batch_predict_job",
        model=model_upload_op.outputs["model"],
        gcs_source_uris=[MODEL_DIR + "/test/test.jsonl"],
        gcs_destination_output_uri_prefix=PIPELINE_ROOT,
        instances_format="jsonl",
        predictions_format="jsonl",
        model_parameters={},
        machine_type=DEPLOY_COMPUTE,
        starting_replica_count=1,
        max_replica_count=1,
    )

    batch_predict_op.after(model_upload_op)

### Compile and run the pipeline
Next, compile the pipeline into a DAG and then exeute it.

In [None]:
compiler.Compiler().compile(
    pipeline_func=pipeline, package_path="custom_model_training_spec.yaml"
)

DISPLAY_NAME = "cifar10"

job = aiplatform.PipelineJob(
    display_name=DISPLAY_NAME,
    template_path="custom_model_training_spec.yaml",
    pipeline_root=PIPELINE_ROOT,
)

job.run(service_account=SERVICE_ACCOUNT)

### View custom training pipeline results

Finally, you view the artifact outputs of each task in the pipeline.

In [None]:
PROJECT_NUMBER = job.gca_resource.name.split("/")[1]
print("Project number:", PROJECT_NUMBER)


def print_pipeline_output(job, output_task_name):
    JOB_ID = job.name
    print(JOB_ID)
    for _ in range(len(job.gca_resource.job_detail.task_details)):
        TASK_ID = job.gca_resource.job_detail.task_details[_].task_id
        EXECUTE_OUTPUT = (
            PIPELINE_ROOT
            + "/"
            + PROJECT_NUMBER
            + "/"
            + JOB_ID
            + "/"
            + output_task_name
            + "_"
            + str(TASK_ID)
            + "/executor_output.json"
        )
        GCP_RESOURCES = (
            PIPELINE_ROOT
            + "/"
            + PROJECT_NUMBER
            + "/"
            + JOB_ID
            + "/"
            + output_task_name
            + "_"
            + str(TASK_ID)
            + "/gcp_resources"
        )
        EVAL_METRICS = (
            PIPELINE_ROOT
            + "/"
            + PROJECT_NUMBER
            + "/"
            + JOB_ID
            + "/"
            + output_task_name
            + "_"
            + str(TASK_ID)
            + "/evaluation_metrics"
        )
        if tf.io.gfile.exists(EXECUTE_OUTPUT):
            ! gsutil cat $EXECUTE_OUTPUT
            return EXECUTE_OUTPUT
        elif tf.io.gfile.exists(GCP_RESOURCES):
            ! gsutil cat $GCP_RESOURCES
            return GCP_RESOURCES
        elif tf.io.gfile.exists(EVAL_METRICS):
            ! gsutil cat $EVAL_METRICS
            return EVAL_METRICS

    return None


print("model-upload")
artifacts = print_pipeline_output(job, "model-upload")
print("\n")
output = !gsutil cat $artifacts
print(output)
output = json.loads(output[0])
model_id = output["artifacts"]["model"]["artifacts"][0]["metadata"]["resourceName"]
print("model-batch-predict")
artifacts = print_pipeline_output(job, "model-batch-predict")
print("\n")
output = !gsutil cat $artifacts
output = json.loads(output[0])
batch_job_id = output["artifacts"]["batchpredictionjob"]["artifacts"][0]["metadata"][
    "resourceName"
]

## Cleaning up

To clean up all Google Cloud resources used in this project, you can [delete the Google Cloud
project](https://cloud.google.com/resource-manager/docs/creating-managing-projects#shutting_down_projects) you used for the tutorial.

Otherwise, you can delete the individual resources you created in this tutorial.

In [None]:
# Delete the model
model = aiplatform.Model(model_id)
model.delete()

# Delete the batch prediction job
batch_job = aiplatform.BatchPredictionJob(batch_job_id)
batch_job.delete()

# Delete the pipeline
job.delete()

# Delete the Cloud Storage bucket
delete_bucket = False  # Set True for deletion
if delete_bucket:
    ! gsutil rm -r $BUCKET_URI

# Remove the locally generated files
! rm custom_model_training_spec.yaml
! rm -rf custom