notebooks/official/pipelines/custom_model_training_and_batch_prediction.ipynb (1,157 lines of code) (raw):

{ "cells": [ { "cell_type": "code", "execution_count": null, "metadata": { "id": "b0b4f2bf" }, "outputs": [], "source": [ "# Copyright 2021 Google LLC\n", "#\n", "# Licensed under the Apache License, Version 2.0 (the \"License\");\n", "# you may not use this file except in compliance with the License.\n", "# You may obtain a copy of the License at\n", "#\n", "# https://www.apache.org/licenses/LICENSE-2.0\n", "#\n", "# Unless required by applicable law or agreed to in writing, software\n", "# distributed under the License is distributed on an \"AS IS\" BASIS,\n", "# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\n", "# See the License for the specific language governing permissions and\n", "# limitations under the License." ] }, { "cell_type": "markdown", "metadata": { "id": "c5c0d0bbf74d" }, "source": [ "# Vertex AI Pipelines: Custom training with pre-built Google Cloud Pipeline Components\n", "\n", "<table align=\"left\">\n", " <td style=\"text-align: center\">\n", " <a href=\"https://colab.research.google.com/github/GoogleCloudPlatform/vertex-ai-samples/blob/main/notebooks/official/pipelines/custom_model_training_and_batch_prediction.ipynb\">\n", " <img src=\"https://cloud.google.com/ml-engine/images/colab-logo-32px.png\" alt=\"Google Colaboratory logo\"><br> Open in Colab\n", " </a>\n", " </td>\n", " <td style=\"text-align: center\">\n", " <a href=\"https://console.cloud.google.com/vertex-ai/colab/import/https:%2F%2Fraw.githubusercontent.com%2FGoogleCloudPlatform%2Fvertex-ai-samples%2Fmain%2Fnotebooks%2Fofficial%2Fpipelines%2Fcustom_model_training_and_batch_prediction.ipynb\">\n", " <img width=\"32px\" src=\"https://cloud.google.com/ml-engine/images/colab-enterprise-logo-32px.png\" alt=\"Google Cloud Colab Enterprise logo\"><br> Open in Colab Enterprise\n", " </a>\n", " </td> \n", " <td style=\"text-align: center\">\n", " <a href=\"https://console.cloud.google.com/vertex-ai/workbench/deploy-notebook?download_url=https://raw.githubusercontent.com/GoogleCloudPlatform/vertex-ai-samples/main/notebooks/official/pipelines/custom_model_training_and_batch_prediction.ipynb\">\n", " <img src=\"https://lh3.googleusercontent.com/UiNooY4LUgW_oTvpsNhPpQzsstV5W8F7rYgxgGBD85cWJoLmrOzhVs_ksK_vgx40SHs7jCqkTkCk=e14-rj-sc0xffffff-h130-w32\" alt=\"Vertex AI logo\"><br> Open in Workbench\n", " </a>\n", " </td>\n", " <td style=\"text-align: center\">\n", " <a href=\"https://github.com/GoogleCloudPlatform/vertex-ai-samples/blob/main/notebooks/official/pipelines/custom_model_training_and_batch_prediction.ipynb\">\n", " <img src=\"https://cloud.google.com/ml-engine/images/github-logo-32px.png\" alt=\"GitHub logo\"><br> View on GitHub\n", " </a>\n", " </td>\n", "</table>" ] }, { "cell_type": "markdown", "metadata": { "id": "d79790f56d30" }, "source": [ "## Overview\n", "\n", "\n", "This tutorial demonstrates how to use Vertex AI Pipelines with pre-built components from Google Cloud Pipeline Components for custom training.\n", "\n", "Learn more about [Vertex AI Pipelines](https://cloud.google.com/vertex-ai/docs/pipelines/introduction) and [Custom training components](https://cloud.google.com/vertex-ai/docs/training/create-training-pipeline)." ] }, { "cell_type": "markdown", "metadata": { "id": "b87eb4840013" }, "source": [ "### Objective\n", "\n", "In this tutorial, you learn to use Vertex AI Pipelines and Google Cloud Pipeline Components to build a custom model.\n", "\n", "\n", "This tutorial uses the following Vertex AI services:\n", "\n", "- Vertex AI Pipelines\n", "- Google Cloud Pipeline Components\n", "- Vertex AI Training\n", "- Vertex AI model resource\n", "- Vertex AI endpoint resource\n", "\n", "The steps performed include:\n", "\n", "- Create a KFP pipeline:\n", " - Train a custom model.\n", " - Upload the trained model as a model resource.\n", " - Create an endpoint resource.\n", " - Deploy the model resource to the endpoint resource.\n", " - Make a batch prediction request.\n", "\n", "Learn more about [Google Cloud Pipeline Components](https://cloud.google.com/vertex-ai/docs/pipelines/build-pipeline)." ] }, { "cell_type": "markdown", "metadata": { "id": "57139e75264f" }, "source": [ "### Dataset\n", "\n", "The dataset used for this tutorial is the [CIFAR10 dataset](https://www.tensorflow.org/datasets/catalog/cifar10) from [TensorFlow Datasets](https://www.tensorflow.org/datasets/catalog/overview). The version of the dataset you'll use is built into TensorFlow. The trained model predicts which type of class an image is from ten classes: airplane, automobile, bird, cat, deer, dog, frog, horse, ship, or truck." ] }, { "cell_type": "markdown", "metadata": { "id": "181d4dfbf917" }, "source": [ "### Costs\n", "\n", "This tutorial uses billable components of Google Cloud:\n", "\n", "* Vertex AI\n", "* Cloud Storage\n", "\n", "Learn about [Vertex AI\n", "pricing](https://cloud.google.com/vertex-ai/pricing) and [Cloud Storage\n", "pricing](https://cloud.google.com/storage/pricing), and use the [Pricing\n", "Calculator](https://cloud.google.com/products/calculator/)\n", "to generate a cost estimate based on your projected usage." ] }, { "cell_type": "markdown", "metadata": { "id": "4b331e2fd155" }, "source": [ "## Get Started" ] }, { "cell_type": "markdown", "metadata": { "id": "install_aip:mbsdk" }, "source": [ "### Install Vertex AI SDK for Python and other required packages" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "install_aip:mbsdk" }, "outputs": [], "source": [ "! pip3 install --upgrade --quiet google-cloud-aiplatform \\\n", " google-cloud-storage \\\n", " kfp \\\n", " google-cloud-pipeline-components\n", "\n", "\n", "! pip3 install --upgrade --force-reinstall tensorflow kfp google-cloud-aiplatform google-cloud-storage google-cloud-pipeline-components -q" ] }, { "cell_type": "markdown", "metadata": { "id": "ff555b32bab8" }, "source": [ "### Restart runtime (Colab only)\n", "\n", "To use the newly installed packages, you must restart the runtime on Google Colab." ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "f09b4dff629a" }, "outputs": [], "source": [ "import sys\n", "\n", "if \"google.colab\" in sys.modules:\n", "\n", " import IPython\n", "\n", " app = IPython.Application.instance()\n", " app.kernel.do_shutdown(True)" ] }, { "cell_type": "markdown", "metadata": { "id": "4a2b7b59bbf7" }, "source": [ "<div class=\"alert alert-block alert-warning\">\n", "<b>⚠️ The kernel is going to restart. Wait until it's finished before continuing to the next step. ⚠️</b>\n", "</div>" ] }, { "cell_type": "markdown", "metadata": { "id": "f82e28c631cc" }, "source": [ "### Authenticate your notebook environment (Colab only)\n", "\n", "Authenticate your environment on Google Colab." ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "46604f70e831" }, "outputs": [], "source": [ "import sys\n", "\n", "if \"google.colab\" in sys.modules:\n", "\n", " from google.colab import auth\n", "\n", " auth.authenticate_user()" ] }, { "cell_type": "markdown", "metadata": { "id": "BF1j6f9HApxa" }, "source": [ "### Set Google Cloud project information\n", "\n", "To get started using Vertex AI, you must have an existing Google Cloud project. Learn more about [setting up a project and a development environment](https://cloud.google.com/vertex-ai/docs/start/cloud-environment)." ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "ab779e2d71d3" }, "outputs": [], "source": [ "PROJECT_ID = \"[your-project-id]\" # @param {type:\"string\"}\n", "LOCATION = \"us-central1\" # @param {type: \"string\"}" ] }, { "cell_type": "markdown", "metadata": { "id": "zgPO1eR3CYjk" }, "source": [ "### Create a Cloud Storage bucket\n", "\n", "Create a storage bucket to store intermediate artifacts such as datasets." ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "MzGDU7TWdts_" }, "outputs": [], "source": [ "BUCKET_URI = f\"gs://your-bucket-name-{PROJECT_ID}-unique\" # @param {type:\"string\"}" ] }, { "cell_type": "markdown", "metadata": { "id": "-EcIXiGsCePi" }, "source": [ "**If your bucket doesn't already exist**: Run the following cell to create your Cloud Storage bucket." ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "NIq7R4HZCfIc" }, "outputs": [], "source": [ "! gsutil mb -l {LOCATION} -p {PROJECT_ID} {BUCKET_URI}" ] }, { "cell_type": "markdown", "metadata": { "id": "set_service_account" }, "source": [ "#### Service Account\n", "\n", "**If you don't know your service account**, try to get your service account using `gcloud` command by executing the second cell below." ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "set_service_account" }, "outputs": [], "source": [ "SERVICE_ACCOUNT = \"[your-service-account]\" # @param {type:\"string\"}" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "autoset_service_account" }, "outputs": [], "source": [ "import sys\n", "\n", "IS_COLAB = \"google.colab\" in sys.modules\n", "if (\n", " SERVICE_ACCOUNT == \"\"\n", " or SERVICE_ACCOUNT is None\n", " or SERVICE_ACCOUNT == \"[your-service-account]\"\n", "):\n", " # Get your service account from gcloud\n", " if not IS_COLAB:\n", " shell_output = !gcloud auth list 2>/dev/null\n", " SERVICE_ACCOUNT = shell_output[2].replace(\"*\", \"\").strip()\n", "\n", " if IS_COLAB:\n", " shell_output = ! gcloud projects describe $PROJECT_ID\n", " project_number = shell_output[-1].split(\":\")[1].strip().replace(\"'\", \"\")\n", " SERVICE_ACCOUNT = f\"{project_number}-compute@developer.gserviceaccount.com\"\n", "\n", " print(\"Service Account:\", SERVICE_ACCOUNT)" ] }, { "cell_type": "markdown", "metadata": { "id": "set_service_account:pipelines" }, "source": [ "#### Set service account access for Vertex AI Pipelines\n", "\n", "Run the following commands to grant your service account access to read and write pipeline artifacts in the bucket that you created in the previous step. You only need to run these once per service account." ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "set_service_account:pipelines" }, "outputs": [], "source": [ "! gsutil iam ch serviceAccount:{SERVICE_ACCOUNT}:roles/storage.objectCreator $BUCKET_URI\n", "\n", "! gsutil iam ch serviceAccount:{SERVICE_ACCOUNT}:roles/storage.objectViewer $BUCKET_URI" ] }, { "cell_type": "markdown", "metadata": { "id": "setup_vars" }, "source": [ "### Import libraries and define constants" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "import_aip:mbsdk" }, "outputs": [], "source": [ "import json\n", "\n", "import google.cloud.aiplatform as aiplatform\n", "import tensorflow as tf\n", "from google_cloud_pipeline_components.v1.custom_job import utils\n", "from kfp import compiler, dsl\n", "from kfp.dsl import component" ] }, { "cell_type": "markdown", "metadata": { "id": "pipeline_constants" }, "source": [ "#### Vertex AI Pipelines constants\n", "\n", "Setup up the following constants for Vertex AI Pipelines:\n", "\n", "- `PIPELINE_ROOT` : Root folder to store pipeline artifacts in Cloud Storage bucket." ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "pipeline_constants" }, "outputs": [], "source": [ "PIPELINE_ROOT = \"{}/pipeline_root/bikes_weather\".format(BUCKET_URI)" ] }, { "cell_type": "markdown", "metadata": { "id": "init_aip:mbsdk" }, "source": [ "### Initialize Vertex AI SDK for Python\n", "\n", "Initialize the Vertex AI SDK for Python for your project and corresponding bucket." ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "yX-aimhGRRRl" }, "outputs": [], "source": [ "aiplatform.init(project=PROJECT_ID, location=LOCATION, staging_bucket=BUCKET_URI)" ] }, { "cell_type": "markdown", "metadata": { "id": "accelerators:training,cpu,prediction,cpu,mbsdk" }, "source": [ "### Set hardware accelerators\n", "\n", "You can set hardware accelerators for training and prediction.\n", "\n", "Set the variables `TRAIN_GPU/TRAIN_NGPU` and `DEPLOY_GPU/DEPLOY_NGPU` to use a container image supporting a GPU and the number of GPUs allocated to the virtual machine (VM) instance. For example, to use a GPU container image with 4 Nvidia Tesla T4 GPUs allocated to each VM, you'd specify:\n", "\n", " (aiplatform.gapic.AcceleratorType.NVIDIA_TESLA_T4, 4)\n", "\n", "\n", "Otherwise specify `(None, None)` to use a container image to run on a CPU.\n", "\n", "Learn more about [ hardware accelerator support for your region](https://cloud.google.com/vertex-ai/docs/general/locations#accelerators).\n", "\n", "**Note**: TF releases before 2.3 for GPU support are expected to fail to load the custom model in this tutorial. It's a known issue and is fixed in TF 2.3. This is caused by static graph ops that are generated in the serving function. If you encounter this issue on your own custom models, use a container image for TF 2.3 with GPU support." ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "ipG9uBUDRRRm" }, "outputs": [], "source": [ "TRAIN_GPU, TRAIN_NGPU = (None, None)\n", "\n", "DEPLOY_GPU, DEPLOY_NGPU = (None, None)" ] }, { "cell_type": "markdown", "metadata": { "id": "container:training,prediction" }, "source": [ "### Set pre-built containers\n", "\n", "Set the pre-built Docker container image for training and prediction.\n", "\n", "\n", "For the latest list, see [Pre-built containers for training](https://cloud.google.com/ai-platform-unified/docs/training/pre-built-containers).\n", "\n", "\n", "For the latest list, see [Pre-built containers for prediction](https://cloud.google.com/ai-platform-unified/docs/predictions/pre-built-containers)." ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "WrKTNnVLRRRm" }, "outputs": [], "source": [ "TF = \"2-13\"\n", "\n", "if TRAIN_GPU:\n", " TRAIN_VERSION = \"tf-gpu.{}\".format(TF)\n", "else:\n", " TRAIN_VERSION = \"tf-cpu.{}\".format(TF)\n", "if DEPLOY_GPU:\n", " DEPLOY_VERSION = \"tf2-gpu.{}\".format(TF)\n", "else:\n", " DEPLOY_VERSION = \"tf2-cpu.{}\".format(TF)\n", "\n", "TRAIN_IMAGE = \"gcr.io/vertex-ai/training/{}:latest\".format(TRAIN_VERSION)\n", "DEPLOY_IMAGE = \"gcr.io/vertex-ai/prediction/{}:latest\".format(DEPLOY_VERSION)\n", "\n", "print(\"Training:\", TRAIN_IMAGE, TRAIN_GPU, TRAIN_NGPU)\n", "print(\"Deployment:\", DEPLOY_IMAGE, DEPLOY_GPU, DEPLOY_NGPU)" ] }, { "cell_type": "markdown", "metadata": { "id": "machine:training,prediction" }, "source": [ "### Set machine type\n", "\n", "Next, set the machine type to use for training and prediction.\n", "\n", "- Set the variables `TRAIN_COMPUTE` and `DEPLOY_COMPUTE` to configure the compute resources for the VMs you'll use for for training and prediction.\n", " - `machine type`\n", " - `n1-standard`: 3.75GB of memory per vCPU.\n", " - `n1-highmem`: 6.5GB of memory per vCPU\n", " - `n1-highcpu`: 0.9 GB of memory per vCPU\n", " - `vCPUs`: number of \\[2, 4, 8, 16, 32, 64, 96 \\]\n", "\n", "**Note**: The following isn't supported for training:\n", "\n", " - `standard`: 2 vCPUs\n", " - `highcpu`: 2, 4 and 8 vCPUs\n", "\n", "**Note**: You may also use n2 and e2 machine types for training and deployment, but they don't support GPUs." ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "K4RhNsDzRRRn" }, "outputs": [], "source": [ "MACHINE_TYPE = \"n1-standard\"\n", "\n", "VCPU = \"4\"\n", "TRAIN_COMPUTE = MACHINE_TYPE + \"-\" + VCPU\n", "print(\"Train machine type\", TRAIN_COMPUTE)\n", "\n", "VCPU = \"4\"\n", "DEPLOY_COMPUTE = MACHINE_TYPE + \"-\" + VCPU\n", "print(\"Deploy machine type\", DEPLOY_COMPUTE)" ] }, { "cell_type": "markdown", "metadata": { "id": "tutorial_start:custom" }, "source": [ "## Tutorial\n", "\n", "Now you're ready to start training on CIFAR10 and create your own custom model." ] }, { "cell_type": "markdown", "metadata": { "id": "examine_training_package" }, "source": [ "### Examine the training package\n", "\n", "#### Package layout\n", "\n", "Before you start the training, you look at how a Python package is assembled for a custom training job. When unarchived, the package contains the following directory/file layout.\n", "\n", "- PKG-INFO\n", "- README.md\n", "- setup.cfg\n", "- setup.py\n", "- trainer\n", " - \\_\\_init\\_\\_.py\n", " - task.py\n", "\n", "The files `setup.cfg` and `setup.py` are the instructions for installing the package into the operating environment of the Docker image.\n", "\n", "The file `trainer/task.py` is the Python script for executing the custom training job. **Note**: When referred to the file in the worker pool specification, the file suffix (`.py`) is dropped and the directory slash is replaced with a dot (`trainer.task`).\n", "\n", "#### Package Assembly\n", "\n", "In the following cells, you assemble the training package." ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "YpA6MFcLRRRn" }, "outputs": [], "source": [ "# Make folder for Python training script\n", "! rm -rf custom\n", "! mkdir custom\n", "\n", "# Add package information\n", "! touch custom/README.md\n", "\n", "setup_cfg = \"[egg_info]\\n\\ntag_build =\\n\\ntag_date = 0\"\n", "! echo \"$setup_cfg\" > custom/setup.cfg\n", "\n", "setup_py = \"import setuptools\\n\\nsetuptools.setup(\\n\\n install_requires=[\\n\\n 'tensorflow_datasets==1.3.0',\\n\\n ],\\n\\n packages=setuptools.find_packages())\"\n", "! echo \"$setup_py\" > custom/setup.py\n", "\n", "pkg_info = \"Metadata-Version: 1.0\\n\\nName: CIFAR10 image classification\\n\\nVersion: 0.0.0\\n\\nSummary: Demostration training script\\n\\nHome-page: www.google.com\\n\\nAuthor: Google\\n\\nAuthor-email: aferlitsch@google.com\\n\\nLicense: Public\\n\\nDescription: Demo\\n\\nPlatform: Vertex\"\n", "! echo \"$pkg_info\" > custom/PKG-INFO\n", "\n", "# Make the training subfolder\n", "! mkdir custom/trainer\n", "! touch custom/trainer/__init__.py" ] }, { "cell_type": "markdown", "metadata": { "id": "86f3ab08b20a" }, "source": [ "### Create a custom component for training the custom model\n", "\n", "Next, create a lightweight Python function component for training the CIFAR10 image classification model." ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "2etUCVVMRRRo" }, "outputs": [], "source": [ "# Single, Mirror and Multi-Machine Distributed Training for CIFAR-10\n", "\n", "\n", "@component(\n", " base_image=\"tensorflow/tensorflow:2.13.0\",\n", " packages_to_install=[\"tensorflow_datasets\", \"opencv-python-headless\"],\n", ")\n", "def custom_train_model(\n", " model_dir: str,\n", " lr: float = 0.01,\n", " epochs: int = 10,\n", " steps: int = 200,\n", " distribute: str = \"single\",\n", "):\n", "\n", " import faulthandler\n", " import os\n", " import sys\n", "\n", " import tensorflow as tf\n", " import tensorflow_datasets as tfds\n", " from tensorflow import keras\n", " from tensorflow.python.client import device_lib\n", "\n", " faulthandler.enable()\n", " tfds.disable_progress_bar()\n", "\n", " print(\"Component start\")\n", "\n", " print(\"Python Version = {}\".format(sys.version))\n", " print(\"TensorFlow Version = {}\".format(tf.__version__))\n", " print(\"TF_CONFIG = {}\".format(os.environ.get(\"TF_CONFIG\", \"Not found\")))\n", " print(\"DEVICES\", device_lib.list_local_devices())\n", "\n", " # Single Machine, single compute device\n", " if distribute == \"single\":\n", " if tf.test.is_gpu_available():\n", " strategy = tf.distribute.OneDeviceStrategy(device=\"/gpu:0\")\n", " else:\n", " strategy = tf.distribute.OneDeviceStrategy(device=\"/cpu:0\")\n", " # Single Machine, multiple compute device\n", " elif distribute == \"mirror\":\n", " strategy = tf.distribute.MirroredStrategy()\n", " # Multiple Machine, multiple compute device\n", " elif distribute == \"multi\":\n", " strategy = tf.distribute.experimental.MultiWorkerMirroredStrategy()\n", "\n", " # Multi-worker configuration\n", " print(\"num_replicas_in_sync = {}\".format(strategy.num_replicas_in_sync))\n", "\n", " # Preparing dataset\n", " BUFFER_SIZE = 10000\n", " BATCH_SIZE = 64\n", "\n", " def make_datasets_unbatched():\n", "\n", " # Scaling CIFAR10 data from (0, 255] to (0., 1.]\n", " def scale(image, label):\n", " image = tf.cast(image, tf.float32)\n", " image /= 255.0\n", " return image, label\n", "\n", " datasets, info = tfds.load(name=\"cifar10\", with_info=True, as_supervised=True)\n", " return datasets[\"train\"].map(scale).cache().shuffle(BUFFER_SIZE).repeat()\n", "\n", " # Build the Keras model\n", " def build_and_compile_cnn_model(lr: int = 0.01):\n", " model = keras.Sequential(\n", " [\n", " keras.layers.Conv2D(32, 3, activation=\"relu\", input_shape=(32, 32, 3)),\n", " keras.layers.MaxPooling2D(),\n", " keras.layers.Conv2D(32, 3, activation=\"relu\"),\n", " keras.layers.MaxPooling2D(),\n", " keras.layers.Flatten(),\n", " keras.layers.Dense(10, activation=\"softmax\"),\n", " ]\n", " )\n", " model.compile(\n", " loss=keras.losses.sparse_categorical_crossentropy,\n", " optimizer=keras.optimizers.SGD(learning_rate=lr),\n", " metrics=[\"accuracy\"],\n", " )\n", " return model\n", "\n", " # Train the model\n", " NUM_WORKERS = strategy.num_replicas_in_sync\n", " # Here the batch size scales up by number of workers since\n", " # `tf.data.Dataset.batch` expects the global batch size.\n", " GLOBAL_BATCH_SIZE = BATCH_SIZE * NUM_WORKERS\n", " train_dataset = make_datasets_unbatched().batch(GLOBAL_BATCH_SIZE)\n", "\n", " with strategy.scope():\n", " # Creation of dataset, and model building/compiling need to be within\n", " # `strategy.scope()`.\n", " model = build_and_compile_cnn_model(lr)\n", "\n", " model.fit(x=train_dataset, epochs=epochs, steps_per_epoch=steps)\n", "\n", " # Save the model\n", " model.save(model_dir + \".keras\")\n", "\n", " model_path_to_deploy = model_dir\n", "\n", " # Load the saved model\n", " local_model = keras.models.load_model(model_dir + \".keras\")\n", "\n", " # Load evaluation data\n", " import numpy as np\n", " from tensorflow.keras.datasets import cifar10\n", "\n", " (_, _), (x_test, y_test) = cifar10.load_data()\n", " x_test = (x_test / 255.0).astype(np.float32)\n", "\n", " print(x_test.shape, y_test.shape)\n", "\n", " # Perform the model evaluation\n", " local_model.evaluate(x_test, y_test)\n", "\n", " # Serving function for image data\n", " CONCRETE_INPUT = \"numpy_inputs\"\n", "\n", " def _preprocess(bytes_input):\n", " decoded = tf.io.decode_jpeg(bytes_input, channels=3)\n", " decoded = tf.image.convert_image_dtype(decoded, tf.float32)\n", " resized = tf.image.resize(decoded, size=(32, 32))\n", " return resized\n", "\n", " @tf.function(input_signature=[tf.TensorSpec([None], tf.string)])\n", " def preprocess_fn(bytes_inputs):\n", " decoded_images = tf.map_fn(\n", " _preprocess, bytes_inputs, dtype=tf.float32, back_prop=False\n", " )\n", " return {\n", " CONCRETE_INPUT: decoded_images\n", " } # User needs to make sure the key matches model's input\n", "\n", " @tf.function(input_signature=[tf.TensorSpec([None], tf.string)])\n", " def serving_fn(bytes_inputs):\n", " images = preprocess_fn(bytes_inputs)\n", " prob = m_call(**images)\n", " return prob\n", "\n", " m_call = tf.function(local_model.call).get_concrete_function(\n", " [tf.TensorSpec(shape=[None, 32, 32, 3], dtype=tf.float32, name=CONCRETE_INPUT)]\n", " )\n", "\n", " tf.saved_model.save(\n", " local_model, model_path_to_deploy, signatures={\"serving_default\": serving_fn}\n", " )\n", "\n", " # Get the serving function signature\n", " loaded = tf.saved_model.load(model_path_to_deploy)\n", "\n", " serving_input = list(\n", " loaded.signatures[\"serving_default\"].structured_input_signature[1].keys()\n", " )[0]\n", " print(\"Serving function input:\", serving_input)\n", "\n", " # Get test items\n", " test_image_1 = x_test[0]\n", " test_image_2 = x_test[1]\n", " print(test_image_1.shape)\n", "\n", " BUCKET_URI = model_dir + \"/test\"\n", "\n", " import cv2\n", "\n", " cv2.imwrite(\"tmp1.jpg\", (test_image_1 * 255).astype(np.uint8))\n", " cv2.imwrite(\"tmp2.jpg\", (test_image_2 * 255).astype(np.uint8))\n", "\n", " print(\"Writing jpg files\")\n", "\n", " # Copy test item(s)\n", " # For the batch prediction, copy the test items over to your Cloud Storage bucket.\n", " test_item_1 = BUCKET_URI + \"/tmp1.jpg\"\n", " test_item_2 = BUCKET_URI + \"/tmp2.jpg\"\n", "\n", " with tf.io.gfile.GFile(test_item_1, \"wb\") as w:\n", " with tf.io.gfile.GFile(\"tmp1.jpg\", \"rb\") as r:\n", " bytes = r.read()\n", " w.write(bytes)\n", "\n", " with tf.io.gfile.GFile(test_item_2, \"wb\") as w:\n", " with tf.io.gfile.GFile(\"tmp2.jpg\", \"rb\") as r:\n", " bytes = r.read()\n", " w.write(bytes)\n", "\n", " # Make the batch input file\n", " import base64\n", " import json\n", "\n", " gcs_input_uri = BUCKET_URI + \"/\" + \"test.jsonl\"\n", " with tf.io.gfile.GFile(gcs_input_uri, \"w\") as f:\n", " bytes = tf.io.read_file(test_item_1)\n", " b64str = base64.b64encode(bytes.numpy()).decode(\"utf-8\")\n", " data = {serving_input: {\"b64\": b64str}}\n", " f.write(json.dumps(data) + \"\\n\")\n", " bytes = tf.io.read_file(test_item_2)\n", " b64str = base64.b64encode(bytes.numpy()).decode(\"utf-8\")\n", " data = {serving_input: {\"b64\": b64str}}\n", " f.write(json.dumps(data) + \"\\n\")" ] }, { "cell_type": "markdown", "metadata": { "id": "1abdbe48" }, "source": [ "### Convert the component to a Vertex AI Custom Job\n", "\n", "Next, use the `create_custom_training_job_op_from_component` method to convert the custom component into a Vertex AI custom job pre-built component.\n", "\n", "**replica_count :** The number of machine replicas the batch operation may be scaled to. Only used if **machine_type** is set. Default is 10." ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "deb6c1cc" }, "outputs": [], "source": [ "custom_job_distributed_training_op = utils.create_custom_training_job_op_from_component(\n", " custom_train_model, replica_count=1\n", ")" ] }, { "cell_type": "markdown", "metadata": { "id": "e57e7311" }, "source": [ "### Define the pipeline for the custom training job\n", "\n", "Next, define the pipeline job that runs the following tasks:\n", "\n", "- Trains the custom model.\n", "- Uploads the model to Verex AI Model Registry.\n", "- Execute a batch prediction job." ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "5R23d2J_HJr-" }, "outputs": [], "source": [ "MODEL_DIR = BUCKET_URI + \"/model\"\n", "\n", "\n", "@dsl.pipeline(name=\"custom-model-training-sample-pipeline\")\n", "def pipeline(\n", " model_dir: str = MODEL_DIR,\n", " lr: float = 0.01,\n", " epochs: int = 10,\n", " steps: int = 200,\n", " distribute: str = \"single\",\n", "):\n", " from google_cloud_pipeline_components.types import artifact_types\n", " from google_cloud_pipeline_components.v1.batch_predict_job import \\\n", " ModelBatchPredictOp\n", " from google_cloud_pipeline_components.v1.model import ModelUploadOp\n", " from kfp.dsl import importer_node\n", "\n", " custom_producer_task = custom_job_distributed_training_op(\n", " model_dir=model_dir,\n", " lr=lr,\n", " epochs=epochs,\n", " steps=steps,\n", " distribute=distribute,\n", " project=PROJECT_ID,\n", " location=LOCATION,\n", " base_output_directory=PIPELINE_ROOT,\n", " )\n", "\n", " unmanaged_model_importer = importer_node.importer(\n", " artifact_uri=model_dir,\n", " artifact_class=artifact_types.UnmanagedContainerModel,\n", " metadata={\n", " \"containerSpec\": {\n", " \"imageUri\": \"us-docker.pkg.dev/vertex-ai/prediction/tf2-cpu.2-13:latest\"\n", " }\n", " },\n", " )\n", "\n", " model_upload_op = ModelUploadOp(\n", " project=PROJECT_ID,\n", " display_name=\"model_display_name\",\n", " unmanaged_container_model=unmanaged_model_importer.outputs[\"artifact\"],\n", " )\n", " model_upload_op.after(custom_producer_task)\n", "\n", " batch_predict_op = ModelBatchPredictOp(\n", " project=PROJECT_ID,\n", " job_display_name=\"batch_predict_job\",\n", " model=model_upload_op.outputs[\"model\"],\n", " gcs_source_uris=[MODEL_DIR + \"/test/test.jsonl\"],\n", " gcs_destination_output_uri_prefix=PIPELINE_ROOT,\n", " instances_format=\"jsonl\",\n", " predictions_format=\"jsonl\",\n", " model_parameters={},\n", " machine_type=DEPLOY_COMPUTE,\n", " starting_replica_count=1,\n", " max_replica_count=1,\n", " )\n", "\n", " batch_predict_op.after(model_upload_op)" ] }, { "cell_type": "markdown", "metadata": { "id": "e584790f" }, "source": [ "### Compile and run the pipeline\n", "Next, compile the pipeline into a DAG and then exeute it." ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "6be28881" }, "outputs": [], "source": [ "compiler.Compiler().compile(\n", " pipeline_func=pipeline, package_path=\"custom_model_training_spec.yaml\"\n", ")\n", "\n", "DISPLAY_NAME = \"cifar10\"\n", "\n", "job = aiplatform.PipelineJob(\n", " display_name=DISPLAY_NAME,\n", " template_path=\"custom_model_training_spec.yaml\",\n", " pipeline_root=PIPELINE_ROOT,\n", ")\n", "\n", "job.run(service_account=SERVICE_ACCOUNT)" ] }, { "cell_type": "markdown", "metadata": { "id": "87774222c338" }, "source": [ "### View custom training pipeline results\n", "\n", "Finally, you view the artifact outputs of each task in the pipeline." ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "ac26e1314c80" }, "outputs": [], "source": [ "PROJECT_NUMBER = job.gca_resource.name.split(\"/\")[1]\n", "print(\"Project number:\", PROJECT_NUMBER)\n", "\n", "\n", "def print_pipeline_output(job, output_task_name):\n", " JOB_ID = job.name\n", " print(JOB_ID)\n", " for _ in range(len(job.gca_resource.job_detail.task_details)):\n", " TASK_ID = job.gca_resource.job_detail.task_details[_].task_id\n", " EXECUTE_OUTPUT = (\n", " PIPELINE_ROOT\n", " + \"/\"\n", " + PROJECT_NUMBER\n", " + \"/\"\n", " + JOB_ID\n", " + \"/\"\n", " + output_task_name\n", " + \"_\"\n", " + str(TASK_ID)\n", " + \"/executor_output.json\"\n", " )\n", " GCP_RESOURCES = (\n", " PIPELINE_ROOT\n", " + \"/\"\n", " + PROJECT_NUMBER\n", " + \"/\"\n", " + JOB_ID\n", " + \"/\"\n", " + output_task_name\n", " + \"_\"\n", " + str(TASK_ID)\n", " + \"/gcp_resources\"\n", " )\n", " EVAL_METRICS = (\n", " PIPELINE_ROOT\n", " + \"/\"\n", " + PROJECT_NUMBER\n", " + \"/\"\n", " + JOB_ID\n", " + \"/\"\n", " + output_task_name\n", " + \"_\"\n", " + str(TASK_ID)\n", " + \"/evaluation_metrics\"\n", " )\n", " if tf.io.gfile.exists(EXECUTE_OUTPUT):\n", " ! gsutil cat $EXECUTE_OUTPUT\n", " return EXECUTE_OUTPUT\n", " elif tf.io.gfile.exists(GCP_RESOURCES):\n", " ! gsutil cat $GCP_RESOURCES\n", " return GCP_RESOURCES\n", " elif tf.io.gfile.exists(EVAL_METRICS):\n", " ! gsutil cat $EVAL_METRICS\n", " return EVAL_METRICS\n", "\n", " return None\n", "\n", "\n", "print(\"model-upload\")\n", "artifacts = print_pipeline_output(job, \"model-upload\")\n", "print(\"\\n\")\n", "output = !gsutil cat $artifacts\n", "print(output)\n", "output = json.loads(output[0])\n", "model_id = output[\"artifacts\"][\"model\"][\"artifacts\"][0][\"metadata\"][\"resourceName\"]\n", "print(\"model-batch-predict\")\n", "artifacts = print_pipeline_output(job, \"model-batch-predict\")\n", "print(\"\\n\")\n", "output = !gsutil cat $artifacts\n", "output = json.loads(output[0])\n", "batch_job_id = output[\"artifacts\"][\"batchpredictionjob\"][\"artifacts\"][0][\"metadata\"][\n", " \"resourceName\"\n", "]" ] }, { "cell_type": "markdown", "metadata": { "id": "a2cd7ee5cc21" }, "source": [ "## Cleaning up\n", "\n", "To clean up all Google Cloud resources used in this project, you can [delete the Google Cloud\n", "project](https://cloud.google.com/resource-manager/docs/creating-managing-projects#shutting_down_projects) you used for the tutorial.\n", "\n", "Otherwise, you can delete the individual resources you created in this tutorial." ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "c29a5f39e211" }, "outputs": [], "source": [ "# Delete the model\n", "model = aiplatform.Model(model_id)\n", "model.delete()\n", "\n", "# Delete the batch prediction job\n", "batch_job = aiplatform.BatchPredictionJob(batch_job_id)\n", "batch_job.delete()\n", "\n", "# Delete the pipeline\n", "job.delete()\n", "\n", "# Delete the Cloud Storage bucket\n", "delete_bucket = False # Set True for deletion\n", "if delete_bucket:\n", " ! gsutil rm -r $BUCKET_URI\n", "\n", "# Remove the locally generated files\n", "! rm custom_model_training_spec.yaml\n", "! rm -rf custom" ] } ], "metadata": { "colab": { "collapsed_sections": [], "name": "custom_model_training_and_batch_prediction.ipynb", "toc_visible": true }, "kernelspec": { "display_name": "Python 3", "name": "python3" } }, "nbformat": 4, "nbformat_minor": 0 }