In [None]:
# Copyright 2021 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

# Distributed Training with Reduction Server

## Overview

This notebook demonstrates how to optimize large distributed training jobs with the Vertex Training Reduction Server. The [TensorFlow NLP Modelling Toolkit](https://github.com/tensorflow/models/tree/master/official/nlp#tensorflow-nlp-modelling-toolkit) from the [TensorFlow Model Garden](https://github.com/tensorflow/models) is used to preprocess the data and create the model and training loop. `MultiWorkerMirroredStrategy` from the `tf.distribute` module is used to distribute model training across multiple machines. 

### Dataset

The dataset used for this tutorial is the [Multi-Genre Natural Language Inference Corpus (MNLI)](https://www.tensorflow.org/datasets/catalog/glue#gluemnli) from the GLUE benchmark. A preprocessed version of this dataset is loaded from a Cloud Storage bucket, and used to fine tune a BERT model for sentence prediction.


### Objective

In this notebook, you create a custom-trained model from a Python script in a Docker container. You learn how to configure, submit, and monitor a Vertex Training job that uses Reduction Server to optimize network bandwith and latency of the gradient reduction operation in distributed training.  

The steps performed include:

- Create a Vertex Custom Training job that uses Reduction Server.
- Submit and monitor the job.
- Cleanup resources.

### Costs 


This tutorial uses billable components of Google Cloud:

* Vertex AI
* Cloud Storage


Learn about [Vertex AI
pricing](https://cloud.google.com/vertex-ai/pricing) and [Cloud Storage
pricing](https://cloud.google.com/storage/pricing), and use the [Pricing
Calculator](https://cloud.google.com/products/calculator/)
to generate a cost estimate based on your projected usage.

### Set up your local development environment

**If you are using Colab or Google Cloud Notebooks**, your environment already meets
all the requirements to run this notebook. You can skip this step.

**Otherwise**, make sure your environment meets this notebook's requirements.
You need the following:

* The Google Cloud SDK
* Git
* Python 3
* virtualenv
* Jupyter notebook running in a virtual environment with Python 3

The Google Cloud guide to [Setting up a Python development
environment](https://cloud.google.com/python/setup) and the [Jupyter
installation guide](https://jupyter.org/install) provide detailed instructions
for meeting these requirements. The following steps provide a condensed set of
instructions:

1. [Install and initialize the Cloud SDK.](https://cloud.google.com/sdk/docs/)

1. [Install Python 3.](https://cloud.google.com/python/setup#installing_python)

1. [Install
   virtualenv](https://cloud.google.com/python/setup#installing_and_using_virtualenv)
   and create a virtual environment that uses Python 3. Activate the virtual environment.

1. To install Jupyter, run `pip3 install jupyter` on the
command-line in a terminal shell.

1. To launch Jupyter, run `jupyter notebook` on the command-line in a terminal shell.

1. Open this notebook in the Jupyter Notebook Dashboard.

### Install the required packages

In [None]:
import os

# The Google Cloud Notebook product has specific requirements
IS_GOOGLE_CLOUD_NOTEBOOK = os.path.exists("/opt/deeplearning/metadata/env_version")

# Google Cloud Notebook requires dependencies to be installed with '--user'
USER_FLAG = ""
if IS_GOOGLE_CLOUD_NOTEBOOK:
    USER_FLAG = "--user"

Install the latest version of the Vertex SDK for Python.

In [None]:
! pip3 install --upgrade google-cloud-aiplatform $USER_FLAG

Install the latest version of the Google Cloud Storage library.

In [None]:
! pip3 install --upgrade google-cloud-storage $USER_FLAG

### Restart the kernel

After you install the additional packages, you need to restart the notebook kernel so it can find the packages.

In [None]:
# Automatically restart kernel after installs
import os

if not os.getenv("IS_TESTING"):
    # Automatically restart kernel after installs
    import IPython

    app = IPython.Application.instance()
    app.kernel.do_shutdown(True)

## Before you begin


### Set up your Google Cloud project

**The following steps are required, regardless of your notebook environment.**

1. [Select or create a Google Cloud project](https://console.cloud.google.com/cloud-resource-manager). When you first create an account, you get a $300 free credit towards your compute/storage costs.

1. [Make sure that billing is enabled for your project](https://cloud.google.com/billing/docs/how-to/modify-project).

1. [Enable the Vertex AI API and Compute Engine API](https://console.cloud.google.com/flows/enableapi?apiid=aiplatform.googleapis.com,compute_component).

1. If you are running this notebook locally, you will need to install the [Cloud SDK](https://cloud.google.com/sdk).

1. Enter your project ID in the cell below. Then run the cell to make sure the
Cloud SDK uses the right project for all the commands in this notebook.

**Note**: Jupyter runs lines prefixed with `!` as shell commands, and it interpolates Python variables prefixed with `$` into these commands.

#### Set your project ID

**If you don't know your project ID**, you may be able to get your project ID using `gcloud`.

In [None]:
import os

PROJECT_ID = ""

# Get your Google Cloud project ID from gcloud
if not os.getenv("IS_TESTING"):
    shell_output=!gcloud config list --format 'value(core.project)' 2>/dev/null
    PROJECT_ID = shell_output[0]
    print("Project ID: ", PROJECT_ID)

Otherwise, set your project ID here.

In [None]:
if PROJECT_ID == "" or PROJECT_ID is None:
    PROJECT_ID = "[your-project-id]"  # @param {type:"string"}
! gcloud config set project {PROJECT_ID}

### Authenticate your Google Cloud account

**If you are using Google Cloud Notebooks**, your environment is already
authenticated. Skip this step.

**If you are using Colab**, run the cell below and follow the instructions
when prompted to authenticate your account via oAuth.

**Otherwise**, follow these steps:

1. In the Cloud Console, go to the [**Create service account key**
   page](https://console.cloud.google.com/apis/credentials/serviceaccountkey).

2. Click **Create service account**.

3. In the **Service account name** field, enter a name, and
   click **Create**.

4. In the **Grant this service account access to project** section, click the **Role** drop-down list. Type "Vertex AI"
into the filter box, and select
   **Vertex AI Administrator**. Type "Storage Object Admin" into the filter box, and select **Storage Object Admin**.

5. Click *Create*. A JSON file that contains your key downloads to your
local environment.

6. Enter the path to your service account key as the
`GOOGLE_APPLICATION_CREDENTIALS` variable in the cell below and run the cell.

In [None]:
import os
import sys

# If you are running this notebook in Colab, run this cell and follow the
# instructions to authenticate your GCP account. This provides access to your
# Cloud Storage bucket and lets you submit training jobs and prediction
# requests.

# The Google Cloud Notebook product has specific requirements
IS_GOOGLE_CLOUD_NOTEBOOK = os.path.exists("/opt/deeplearning/metadata/env_version")

# If on Google Cloud Notebooks, then don't execute this code
if not IS_GOOGLE_CLOUD_NOTEBOOK:
    if "google.colab" in sys.modules:
        from google.colab import auth as google_auth

        google_auth.authenticate_user()

    # If you are running this notebook locally, replace the string below with the
    # path to your service account key and run this cell to authenticate your GCP
    # account.
    elif not os.getenv("IS_TESTING"):
        %env GOOGLE_APPLICATION_CREDENTIALS ''

### Create a Cloud Storage bucket

**The following steps are required, regardless of your notebook environment.**


In this example, your training application uses Cloud Storage for accessing training and validation datasets and for storing checkpoints. 

Set the name of your Cloud Storage bucket below. It must be unique across all
Cloud Storage buckets.

You may also change the `REGION` variable, which is used for operations
throughout the rest of this notebook. Make sure to [choose a region where Vertex AI services are
available](https://cloud.google.com/vertex-ai/docs/general/locations#available_regions). You may
not use a Multi-Regional Storage bucket for training with Vertex AI.

In [None]:
BUCKET_NAME = "[your-bucket-name]"  # @param {type:"string"}
REGION = "[your-region]"  # @param {type:"string"}

**Only if your bucket doesn't already exist**: Run the following cell to create your Cloud Storage bucket.

In [None]:
! gsutil mb -l $REGION $BUCKET_NAME

Finally, validate access to your Cloud Storage bucket by examining its contents:

In [None]:
! gsutil ls -al $BUCKET_NAME

### Import libraries and define constants

In [None]:
import time

from google.cloud import aiplatform

### Initialize Vertex SDK

In [None]:
aiplatform.init(project=PROJECT_ID, location=REGION, staging_bucket=BUCKET_NAME)

### Set dataset location

The Multi-Genre Natural Language Inference Corpus (MNLI) dataset has been preprocessed to a format required by the TensorFlow NLP Modelling Toolkit and uploaded to a public Cloud Storage bucket.

In [None]:
DATASET_LOCATION = "gs://cloud-samples-data/vertex-ai/community-content/datasets/MNLI"
TRAIN_FILE = f"{DATASET_LOCATION}/mnli_train.tf_record"
EVAL_FILE = f"{DATASET_LOCATION}/mnli_valid.tf_record"
METADATA_FILE = f"{DATASET_LOCATION}/metadata.json"

In [None]:
# List the files

! gsutil ls {DATASET_LOCATION}

In [None]:
# Examine dataset metadata

! gsutil cat {METADATA_FILE}

### Create a training container
#### Write Dockerfile

The first step in containerizing your code is to create a Dockerfile. In the Dockerfile, you'll include all the commands needed to run the image such as installing the necessary libraries and setting up the entry point for the training code.

This Dockerfile uses the Vertex pre-built TensorFlow 2.5 GPU Docker image as a base image. After downloading that image, this Dockerfile installs the TensorFlow Official Models and TensorFlow Text libraries. The Reduction Server NCCL plugin is pre-installed in the base image.

In [None]:
# Create training image directory

! mkdir training_image

In [None]:
%%writefile training_image/Dockerfile

FROM us-docker.pkg.dev/vertex-ai/training/tf-gpu.2-5:latest
    
WORKDIR /

# Installs Official Models and Text libraries
RUN pip install tf-models-official==2.5.1 tensorflow-text==2.5.0


# Copies the trainer code to the docker image.
COPY trainer /trainer

#### Create training application code

Next, you create a `training_image/trainer` directory with a `train.py` script that contains the code for your training application.

The `train.py` training script is based on [the common training driver](https://github.com/tensorflow/models/blob/master/official/nlp/docs/train.md) from the TensorFlow NLP Modelling Toolkit. The common training driver script supports multiple NLP tasks (e.g., pre-training, GLUE and SQuAD fine-tuning) and multiple models (e.g., BERT, ALBERT). 

A set of configurations for a specific NLP task is called an experiment.  The toolkit includes a set of pre-defined experiments. When you invoke the script, you specificy an experiment type using the `--experiment` command line argument. There are two options for overriding the default experiment configuration:
- Specify one or multiple YAML configurations with updated settings using the `--config_file` command line argument
- Provide updated settings as a list of key/value pairs through the `--params_override` command line argument

If you specify both `--config_file` and `--params_override`, the settings in `--params_override` take precedence.

Retrieving the default experiment configuration and merging user provided settings is encapsulated in the `official.core.train_utils.parse_configuration` utility function from the TensorFlow Model Garden.

In the following cell, the [common training driver script](https://github.com/tensorflow/models/blob/master/official/nlp/train.py)  has been adapted to work seamlessly on a distributed compute environment provisioned when running a Vertex Training job. The TensorFlow NLP Modelling Toolkit uses [Orbit](https://github.com/tensorflow/models/tree/master/orbit) to implement a custom training loop. The custom training loop saves checkpoints, writes Tensorboard summaries, and saves a trained model to a storage location specified through the `--model-dir` command line argument. 

Note that when using the base common training driver in a distributed setting, each worker uses the same base storage location. To avoid conflicts when multiple workers write to the same storage location, the driver code has been modified so that each worker uses a different storage location based on its role in a Vertex compute cluster. This logic is implemented in the `_get_model_dir` utility function.


In [None]:
# Create trainer directory

! mkdir training_image/trainer

In [None]:
%%writefile training_image/trainer/train.py

# Copyright 2021 The TensorFlow Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""TFM common training driver."""

import json
import os

from absl import app
from absl import flags
from absl import logging
import gin
from official.common import distribute_utils
from official.common import flags as tfm_flags
from official.common import registry_imports
from official.core import task_factory
from official.core import train_lib
from official.core import train_utils
from official.modeling import performance

FLAGS = flags.FLAGS


def _get_model_dir(model_dir):
  """Defines utility functions for model saving.

  In a multi-worker scenario, the chief worker will save to the
  desired model directory, while the other workers will save the model to
  temporary directories. It’s important that these temporary directories
  are unique in order to prevent multiple workers from writing to the same
  location. Saving can contain collective ops, so all workers must save and
  not just the chief.
  """

  def _is_chief(task_type, task_id):
    return ((task_type == 'chief' and task_id == 0) or task_type is None)

  tf_config = os.getenv('TF_CONFIG')
  if tf_config:
    tf_config = json.loads(tf_config)

    if not _is_chief(tf_config['task']['type'], tf_config['task']['index']):
      model_dir = os.path.join(model_dir,
                               'worker-{}').format(tf_config['task']['index'])

  logging.info('Setting model_dir to: %s', model_dir)

  return model_dir


def main(_):

  model_dir = _get_model_dir(FLAGS.model_dir)

  gin.parse_config_files_and_bindings(FLAGS.gin_file, FLAGS.gin_params)
  params = train_utils.parse_configuration(FLAGS)

  if 'train' in FLAGS.mode:
    # Pure eval modes do not output yaml files. Otherwise continuous eval job
    # may race against the train job for writing the same file.
    train_utils.serialize_config(params, model_dir)

  # Sets mixed_precision policy. Using 'mixed_float16' or 'mixed_bfloat16'
  # can have significant impact on model speeds by utilizing float16 in case of
  # GPUs, and bfloat16 in the case of TPUs. loss_scale takes effect only when
  # dtype is float16
  if params.runtime.mixed_precision_dtype:
    performance.set_mixed_precision_policy(params.runtime.mixed_precision_dtype)
  distribution_strategy = distribute_utils.get_distribution_strategy(
      distribution_strategy=params.runtime.distribution_strategy,
      all_reduce_alg=params.runtime.all_reduce_alg,
      num_gpus=params.runtime.num_gpus,
      tpu_address=params.runtime.tpu,
      **params.runtime.model_parallelism())
  with distribution_strategy.scope():
    task = task_factory.get_task(params.task, logging_dir=model_dir)

  train_lib.run_experiment(
      distribution_strategy=distribution_strategy,
      task=task,
      mode=FLAGS.mode,
      params=params,
      model_dir=model_dir)

  train_utils.save_gin_config(FLAGS.mode, model_dir)


if __name__ == '__main__':
  tfm_flags.define_flags()
  app.run(main)

####  Create base settings for the MNLI fine tuning experiment

The TensorFlow NLP Modelling Toolkit includes a predefined experiment for a set of text classification tasks, the `bert/sentence_prediction` experiment. The base settings in the `bert/sentence_prediction` experiment need to be updated for the MNLI fine tuning task you perform in this example.

In the next cell, you update the settings by creating a YAML configuration file that will be referenced when invoking a training script. As noted earlier, you can fine tune these settings even further for each training run by using the `--params_override` flag.

The configuration file has three sections: `task`, `trainer`, and `runtime`.
* The `task` section contains settings specific to your machine learning task, including a URI to a pre-trained BERT model,  training and validation datasets settings, and evaluation metrics. 
* The `trainer` section configures the settings that control the custom training loop, like checkpoint interval or a number of training steps. 
* The `runtime` section includes the settings for a training runtime: a distributed training strategy, GPU configurations, etc.

In [None]:
%%writefile training_image/trainer/glue_mnli_matched.yaml

task:
  hub_module_url: 'https://tfhub.dev/tensorflow/bert_en_uncased_L-24_H-1024_A-16/4'
  model:
    num_classes: 3
  init_checkpoint: ''
  metric_type: 'accuracy'
  train_data:
    drop_remainder: true
    global_batch_size: 32
    input_path: ''
    is_training: true
    seq_length: 128
    label_type: 'int'
  validation_data:
    drop_remainder: false
    global_batch_size: 32
    input_path: ''
    is_training: false
    seq_length: 128
    label_type: 'int'
trainer:
  checkpoint_interval: 3000
  optimizer_config:
    learning_rate:
      polynomial:
        # 100% of train_steps.
        decay_steps: 36813
        end_learning_rate: 0.0
        initial_learning_rate: 3.0e-05
        power: 1.0
      type: polynomial
    optimizer:
      type: adamw
    warmup:
      polynomial:
        power: 1
        # ~10% of train_steps.
        warmup_steps: 3681
      type: polynomial
  steps_per_loop: 1000
  summary_interval: 1000
  # Training data size 392,702 examples, 3 epochs.
  train_steps: 36813
  validation_interval: 6135
  # Eval data size = 9815 examples.
  validation_steps: 307
  best_checkpoint_export_subdir: 'best_ckpt'
  best_checkpoint_eval_metric: 'cls_accuracy'
  best_checkpoint_metric_comp: 'higher'
runtime:
  distribution_strategy: 'multi_worker_mirrored'
  all_reduce_alg: 'nccl'

### Build the container

In the next cells, you build the container and push it to Google Container Registry.

In [None]:
TRAIN_IMAGE = f"gcr.io/{PROJECT_ID}/mnli_finetuning"

In [None]:
! docker build -t {TRAIN_IMAGE} training_image

In [None]:
! docker push {TRAIN_IMAGE}

### Create a custom training job

When you run a distributed training job with Vertex AI, you specify multiple machines (nodes) in a training cluster. The training service allocates the resources for the machine types you specify. Your running job on a given node is called a replica. A group of replicas with the same configuration is called a worker pool. Vertex Training provides 4 worker pools to cover the different types of machine tasks. To use the Reduction Server, you'll need to use 3 of the 4 available worker pools.

* **Worker pool 0** configures the Primary, chief, scheduler, or "master".  This worker generally takes on some extra work such as saving checkpoints and writing summary files. There is only ever one chief worker in a cluster, so your worker count for worker pool 0 will always be 1.

* **Worker pool 1** is where you configure the rest of the workers for your cluster. 
 
* **Worker pool 2** manages Reduction Server reducers. 

Worker pools 0 and 1 run the custom training container you created in the previous step. Worker pool 2 uses the Reduction Server image provided by Vertex AI.

The helper function below creates worker pool specifications using the described worker pool topology.

In [None]:
def prepare_worker_pool_specs(
    image_uri,
    args,
    cmd,
    replica_count=1,
    machine_type="n1-standard-4",
    accelerator_count=0,
    accelerator_type="ACCELERATOR_TYPE_UNSPECIFIED",
    reduction_server_count=0,
    reduction_server_machine_type="n1-highcpu-16",
    reduction_server_image_uri="us-docker.pkg.dev/vertex-ai-restricted/training/reductionserver:latest",
):

    if accelerator_count > 0:
        machine_spec = {
            "machine_type": machine_type,
            "accelerator_type": accelerator_type,
            "accelerator_count": accelerator_count,
        }
    else:
        machine_spec = {"machine_type": machine_type}

    container_spec = {
        "image_uri": image_uri,
        "args": args,
        "command": cmd,
    }

    chief_spec = {
        "replica_count": 1,
        "machine_spec": machine_spec,
        "container_spec": container_spec,
    }

    worker_pool_specs = [chief_spec]
    if replica_count > 1:
        workers_spec = {
            "replica_count": replica_count - 1,
            "machine_spec": machine_spec,
            "container_spec": container_spec,
        }
        worker_pool_specs.append(workers_spec)

    if reduction_server_count > 1:
        workers_spec = {
            "replica_count": reduction_server_count,
            "machine_spec": {
                "machine_type": reduction_server_machine_type,
            },
            "container_spec": {"image_uri": reduction_server_image_uri},
        }
        worker_pool_specs.append(workers_spec)

    return worker_pool_specs

#### Configure worker pools

When choosing the number and type of reducers, you should consider the network bandwidth supported by a reducer replica’s machine type. In GCP, a VM’s machine type defines its maximum possible egress bandwidth. For example, the egress bandwidth of the `n1-highcpu-16` machine type is limited at 32 Gbps.

Because reducers perform a very limited function, aggregating blocks of gradients, they can run on relatively low-powered and cost effective machines. Even with a large number of gradients this computation does not require accelerated hardware or high CPU or memory resources. However, to avoid network bottlenecks, the total aggregate bandwidth of all replicas in the reducer worker pool must be greater or equal to the total aggregate bandwidth of all replicas in worker pools 0 and 1, which host the GPU workers.

In [None]:
REPLICA_COUNT = 2
WORKER_MACHINE_TYPE = "a2-highgpu-4g"
ACCELERATOR_TYPE = "NVIDIA_TESLA_A100"
PER_MACHINE_ACCELERATOR_COUNT = 4
PER_REPLICA_BATCH_SIZE = 32

REDUCTION_SERVER_COUNT = 4
REDUCTION_SERVER_MACHINE_TYPE = "n1-highcpu-16"

#### Fine tune the  MNLI experiment settings

The default settings for the MNLI fine tuning experiment have been configured in the YAML configuration file created in the previous steps. To override the defaults for a specific training run, use the `--params_override` flag.

`params_override` accepts a string with comma separated key/value pairs for each parameter to be overridden.

In the next cell you update the following settings:

- `trainer.train_step` - The number of training steps. 
- `trainer.steps_per_loop` - The training script prints out updates about training progress every `steps_per_loop`.
- `trainer.summary_interval` - The training script logs Tensorboard summaries every `summary_interval`.
- `trainer.validation_interval` - The training script runs validation every `validation_interval`.
- `trainer.checkpoint_interval` - The training script creates a checkpoint every `checkpoint_interval`.
- `task.train_data.global_batch_size` - Global batch size for training data.
- `task.validation_data.global_batch_size` - Global batch size for validation data.
- `task.train_data.input_path` - Location of the training dataset.
- `task.validation_data.input_path` - Location of the validation dataset.
- `runtime.num_gpus` -Number of GPUs to use on each worker.

In [None]:
PARAMS_OVERRIDE = ",".join(
    [
        "trainer.train_steps=2000",
        "trainer.steps_per_loop=100",
        "trainer.summary_interval=100",
        "trainer.validation_interval=2000",
        "trainer.checkpoint_interval=2000",
        "task.train_data.global_batch_size="
        + str(REPLICA_COUNT * PER_REPLICA_BATCH_SIZE * PER_MACHINE_ACCELERATOR_COUNT),
        "task.validation_data.global_batch_size="
        + str(REPLICA_COUNT * PER_REPLICA_BATCH_SIZE * PER_MACHINE_ACCELERATOR_COUNT),
        "task.train_data.input_path=" + TRAIN_FILE,
        "task.validation_data.input_path=" + EVAL_FILE,
        "runtime.num_gpus=" + str(PER_MACHINE_ACCELERATOR_COUNT),
    ]
)

### Submit and monitor the job

After the experimentation and worker pool configuration parameters have been defined, use the Vertex SDK for Python to submit and monitor a training job.

In [None]:
JOB_NAME = "MNLI_{}".format(time.strftime("%Y%m%d_%H%M%S"))
MODEL_DIR = f"{BUCKET_NAME}/{JOB_NAME}/model"

WORKER_CMD = ["python", "trainer/train.py"]
WORKER_ARGS = [
    "--experiment=bert/sentence_prediction",
    "--mode=train",
    "--model_dir=" + MODEL_DIR,
    "--config_file=trainer/glue_mnli_matched.yaml",
    "--params_override=" + PARAMS_OVERRIDE,
]

worker_pool_specs = prepare_worker_pool_specs(
    image_uri=TRAIN_IMAGE,
    args=WORKER_ARGS,
    cmd=WORKER_CMD,
    replica_count=REPLICA_COUNT,
    machine_type=WORKER_MACHINE_TYPE,
    accelerator_count=PER_MACHINE_ACCELERATOR_COUNT,
    accelerator_type=ACCELERATOR_TYPE,
    reduction_server_count=REDUCTION_SERVER_COUNT,
    reduction_server_machine_type=REDUCTION_SERVER_MACHINE_TYPE,
)

In [None]:
job = aiplatform.CustomJob(
    display_name=JOB_NAME,
    worker_pool_specs=worker_pool_specs,
)
job.run(sync=False)

## Cleaning up

To clean up all Google Cloud resources used in this project, you can [delete the Google Cloud
project](https://cloud.google.com/resource-manager/docs/creating-managing-projects#shutting_down_projects) you used for the tutorial.

Otherwise, you can delete the individual resources you created in this tutorial:


In [None]:
# Delete Cloud Storage objects that were created
! gsutil -m rm -r {BUCKET_NAME}

# Delete the training job
delete_training_job = True
job.delete()