notebooks/official/pipelines/control_flow_kfp.ipynb (668 lines of code) (raw):

{ "cells": [ { "cell_type": "code", "execution_count": null, "metadata": { "id": "copyright" }, "outputs": [], "source": [ "# Copyright 2024 Google LLC\n", "#\n", "# Licensed under the Apache License, Version 2.0 (the \"License\");\n", "# you may not use this file except in compliance with the License.\n", "# You may obtain a copy of the License at\n", "#\n", "# https://www.apache.org/licenses/LICENSE-2.0\n", "#\n", "# Unless required by applicable law or agreed to in writing, software\n", "# distributed under the License is distributed on an \"AS IS\" BASIS,\n", "# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\n", "# See the License for the specific language governing permissions and\n", "# limitations under the License." ] }, { "cell_type": "markdown", "metadata": { "id": "title:generic" }, "source": [ "# Vertex AI Pipelines: pipeline control structures using the KFP SDK\n", "\n", "<table align=\"left\">\n", " <td style=\"text-align: center\">\n", " <a href=\"https://colab.research.google.com/github/GoogleCloudPlatform/vertex-ai-samples/blob/main/notebooks/official/pipelines/control_flow_kfp.ipynb\">\n", " <img src=\"https://cloud.google.com/ml-engine/images/colab-logo-32px.png\" alt=\"Google Colaboratory logo\"><br> Open in Colab\n", " </a>\n", " </td>\n", " <td style=\"text-align: center\">\n", " <a href=\"https://console.cloud.google.com/vertex-ai/colab/import/https:%2F%2Fraw.githubusercontent.com%2FGoogleCloudPlatform%2Fvertex-ai-samples%2Fmain%2Fnotebooks%2Fofficial%2Fpipelines%2Fcontrol_flow_kfp.ipynb\">\n", " <img width=\"32px\" src=\"https://cloud.google.com/ml-engine/images/colab-enterprise-logo-32px.png\" alt=\"Google Cloud Colab Enterprise logo\"><br> Open in Colab Enterprise\n", " </a>\n", " </td> \n", " <td style=\"text-align: center\">\n", " <a href=\"https://console.cloud.google.com/vertex-ai/workbench/deploy-notebook?download_url=https://raw.githubusercontent.com/GoogleCloudPlatform/vertex-ai-samples/main/notebooks/official/pipelines/control_flow_kfp.ipynb\">\n", " <img src=\"https://lh3.googleusercontent.com/UiNooY4LUgW_oTvpsNhPpQzsstV5W8F7rYgxgGBD85cWJoLmrOzhVs_ksK_vgx40SHs7jCqkTkCk=e14-rj-sc0xffffff-h130-w32\" alt=\"Vertex AI logo\"><br> Open in Workbench\n", " </a>\n", " </td>\n", " <td style=\"text-align: center\">\n", " <a href=\"https://github.com/GoogleCloudPlatform/vertex-ai-samples/blob/main/notebooks/official/pipelines/control_flow_kfp.ipynb\">\n", " <img src=\"https://cloud.google.com/ml-engine/images/github-logo-32px.png\" alt=\"GitHub logo\"><br> View on GitHub\n", " </a>\n", " </td>\n", "</table>" ] }, { "cell_type": "markdown", "metadata": { "id": "overview:pipelines,control" }, "source": [ "## Overview\n", "\n", "This notebook demostrates how to use [the Kubeflow Pipelines (KFP) SDK](https://www.kubeflow.org/docs/components/pipelines/v2/) to build Vertex AI Pipelines using control structures.\n", "\n", "Learn more about [Vertex AI Pipelines](https://cloud.google.com/vertex-ai/docs/pipelines/introduction)." ] }, { "cell_type": "markdown", "metadata": { "id": "objective:pipelines,control" }, "source": [ "### Objective\n", "\n", "In this tutorial, you learn how to use the KFP SDK, which uses loops and conditionals including nested examples, to build pipelines.\n", "\n", "This tutorial uses the following Google Cloud ML services:\n", "\n", "- Vertex AI Pipelines\n", "\n", "The steps performed include:\n", "\n", "- Create a KFP pipeline using control flow components\n", "- Compile the KFP pipeline\n", "- Execute the KFP pipeline using Vertex AI Pipelines" ] }, { "cell_type": "markdown", "metadata": { "id": "costs" }, "source": [ "### Costs\n", "\n", "This tutorial uses the following billable components of Google Cloud:\n", "\n", "* Vertex AI\n", "* Cloud Storage\n", "\n", "Learn about [Vertex AI\n", "pricing](https://cloud.google.com/vertex-ai/pricing) and [Cloud Storage\n", "pricing](https://cloud.google.com/storage/pricing), and use the [Pricing\n", "Calculator](https://cloud.google.com/products/calculator/)\n", "to generate a cost estimate based on your projected usage." ] }, { "cell_type": "markdown", "metadata": { "id": "install_aip:mbsdk" }, "source": [ "## Get started\n" ] }, { "cell_type": "markdown", "metadata": { "id": "No17Cw5hgx12" }, "source": [ "### Install Vertex AI SDK for Python and other required packages\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "install_aip:mbsdk" }, "outputs": [], "source": [ "! pip3 install --upgrade --quiet google-cloud-aiplatform \\\n", " google-cloud-storage \\\n", " kfp \\\n", " google-cloud-pipeline-components" ] }, { "cell_type": "markdown", "metadata": { "id": "R5Xep4W9lq-Z" }, "source": [ "### Restart runtime (Colab only)\n", "\n", "To use the newly installed packages, you must restart the runtime on Google Colab." ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "XRvKdaPDTznN" }, "outputs": [], "source": [ "import sys\n", "\n", "if \"google.colab\" in sys.modules:\n", "\n", " import IPython\n", "\n", " app = IPython.Application.instance()\n", " app.kernel.do_shutdown(True)" ] }, { "cell_type": "markdown", "metadata": { "id": "SbmM4z7FOBpM" }, "source": [ "<div class=\"alert alert-block alert-warning\">\n", "<b>⚠️ The kernel is going to restart. Wait until it's finished before continuing to the next step. ⚠️</b>\n", "</div>\n" ] }, { "cell_type": "markdown", "metadata": { "id": "dmWOrTJ3gx13" }, "source": [ "### Authenticate your notebook environment (Colab only)\n", "\n", "Authenticate your environment on Google Colab.\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "NyKGtVQjgx13" }, "outputs": [], "source": [ "import sys\n", "\n", "if \"google.colab\" in sys.modules:\n", "\n", " from google.colab import auth\n", "\n", " auth.authenticate_user()" ] }, { "cell_type": "markdown", "metadata": { "id": "DF4l8DTdWgPY" }, "source": [ "### Set Google Cloud project information\n", "\n", "To get started using Vertex AI, you must have an existing Google Cloud project. Learn more about [setting up a project and a development environment](https://cloud.google.com/vertex-ai/docs/start/cloud-environment)." ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "Nqwi-5ufWp_B" }, "outputs": [], "source": [ "PROJECT_ID = \"[your-project-id]\" # @param {type:\"string\"}\n", "LOCATION = \"us-central1\" # @param {type:\"string\"}" ] }, { "cell_type": "markdown", "metadata": { "id": "zgPO1eR3CYjk" }, "source": [ "### Create a Cloud Storage bucket\n", "\n", "Create a storage bucket to store intermediate artifacts such as datasets." ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "MzGDU7TWdts_" }, "outputs": [], "source": [ "BUCKET_URI = f\"gs://your-bucket-name-{PROJECT_ID}-unique\" # @param {type:\"string\"}" ] }, { "cell_type": "markdown", "metadata": { "id": "-EcIXiGsCePi" }, "source": [ "**If your bucket doesn't already exist**: Run the following cell to create your Cloud Storage bucket." ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "NIq7R4HZCfIc" }, "outputs": [], "source": [ "! gsutil mb -l {LOCATION} -p {PROJECT_ID} {BUCKET_URI}" ] }, { "cell_type": "markdown", "metadata": { "id": "set_service_account" }, "source": [ "#### Service Account\n", "\n", "**If you don't know your service account**, create your service account using the `gcloud` command by executing the second cell below." ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "set_service_account" }, "outputs": [], "source": [ "SERVICE_ACCOUNT = \"[your-service-account]\" # @param {type:\"string\"}" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "autoset_service_account" }, "outputs": [], "source": [ "import sys\n", "\n", "IS_COLAB = \"google.colab\" in sys.modules\n", "if (\n", " SERVICE_ACCOUNT == \"\"\n", " or SERVICE_ACCOUNT is None\n", " or SERVICE_ACCOUNT == \"[your-service-account]\"\n", "):\n", " # Get your service account from gcloud\n", " if not IS_COLAB:\n", " shell_output = !gcloud auth list 2>/dev/null\n", " SERVICE_ACCOUNT = shell_output[2].replace(\"*\", \"\").strip()\n", "\n", " if IS_COLAB:\n", " shell_output = ! gcloud projects describe $PROJECT_ID\n", " project_number = shell_output[-1].split(\":\")[1].strip().replace(\"'\", \"\")\n", " SERVICE_ACCOUNT = f\"{project_number}-compute@developer.gserviceaccount.com\"\n", "\n", " print(\"Service Account:\", SERVICE_ACCOUNT)" ] }, { "cell_type": "markdown", "metadata": { "id": "set_service_account:pipelines" }, "source": [ "#### Set service account access for Vertex AI Pipelines\n", "\n", "Run the following commands to grant your service account permission to read and write pipeline artifacts in the bucket created in the previous step. You only need to run these once per service account." ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "set_service_account:pipelines" }, "outputs": [], "source": [ "! gsutil iam ch serviceAccount:{SERVICE_ACCOUNT}:roles/storage.objectCreator $BUCKET_URI\n", "\n", "! gsutil iam ch serviceAccount:{SERVICE_ACCOUNT}:roles/storage.objectViewer $BUCKET_URI" ] }, { "cell_type": "markdown", "metadata": { "id": "setup_vars" }, "source": [ "### Import libraries and define constants" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "import_aip:mbsdk" }, "outputs": [], "source": [ "import json\n", "\n", "from google.cloud import aiplatform\n", "from kfp import compiler, dsl\n", "from kfp.dsl import component" ] }, { "cell_type": "markdown", "metadata": { "id": "pipeline_constants" }, "source": [ "#### Vertex AI Pipelines constants\n", "\n", "Setup up the following constants for Vertex AI Pipelines:" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "pipeline_constants" }, "outputs": [], "source": [ "PIPELINE_ROOT = \"{}/pipeline_root/control\".format(BUCKET_URI)" ] }, { "cell_type": "markdown", "metadata": { "id": "init_aip:mbsdk" }, "source": [ "## Initialize Vertex AI SDK for Python\n", "\n", "Initialize the Vertex AI SDK for Python for your project and corresponding bucket." ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "init_aip:mbsdk" }, "outputs": [], "source": [ "aiplatform.init(project=PROJECT_ID, staging_bucket=BUCKET_URI)" ] }, { "cell_type": "markdown", "metadata": { "id": "define_component:coin" }, "source": [ "## Define pipeline components\n", "\n", "The following example defines three simple pipeline components:\n", "\n", "- A component that generates a list of dicts and returns a stringified json.\n", "(**Note**: This component requires an `import json` in the component function definition)\n", "- A component that just prints its input string\n", "- A component that does a 'coin flip' and returns `heads` or `tails`." ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "define_component:coin" }, "outputs": [], "source": [ "@component\n", "def args_generator_op() -> str:\n", " import json\n", "\n", " return json.dumps(\n", " [{\"cats\": \"1\", \"dogs\": \"2\"}, {\"cats\": \"10\", \"dogs\": \"20\"}],\n", " sort_keys=True,\n", " )\n", "\n", "\n", "@component\n", "def print_op(msg: str):\n", " print(msg)\n", "\n", "\n", "@component\n", "def flip_coin_op() -> str:\n", " \"\"\"Flip a coin and return heads or tails randomly.\"\"\"\n", " import random\n", "\n", " result = \"heads\" if random.randint(0, 1) == 0 else \"tails\"\n", " return result" ] }, { "cell_type": "markdown", "metadata": { "id": "define_pipeline:control" }, "source": [ "## Define a pipeline that uses control structures\n", "\n", "The following example defines a pipeline component that demonstrates the use of `dsl.Condition` and `dsl.ParallelFor`.\n", "\n", "The default value of `json_string` is a nested JSON list converted to a string. According to the pipeline definition, the loop and conditional expressions process this string as a list, and access list items and sub-items.\n", "The same holds for the list output by the `args_generator_op`." ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "define_pipeline:control" }, "outputs": [], "source": [ "@dsl.pipeline(\n", " name=\"control\",\n", " pipeline_root=PIPELINE_ROOT,\n", ")\n", "def pipeline(\n", " json_string: str = json.dumps(\n", " [\n", " {\n", " \"snakes\": \"anaconda\",\n", " \"lizards\": \"anole\",\n", " \"bunnies\": [{\"cottontail\": \"bugs\"}, {\"cottontail\": \"thumper\"}],\n", " },\n", " {\n", " \"snakes\": \"cobra\",\n", " \"lizards\": \"gecko\",\n", " \"bunnies\": [{\"cottontail\": \"roger\"}],\n", " },\n", " {\n", " \"snakes\": \"boa\",\n", " \"lizards\": \"iguana\",\n", " \"bunnies\": [\n", " {\"cottontail\": \"fluffy\"},\n", " {\"fuzzy_lop\": \"petunia\", \"cottontail\": \"peter\"},\n", " ],\n", " },\n", " ],\n", " sort_keys=True,\n", " )\n", "):\n", "\n", " flip1 = flip_coin_op()\n", "\n", " with dsl.Condition(\n", " flip1.output != \"no-such-result\", name=\"alwaystrue\"\n", " ): # always true\n", "\n", " args_generator = args_generator_op()\n", " with dsl.ParallelFor(args_generator.output) as item:\n", " print_op(msg=json_string)\n", "\n", " with dsl.Condition(flip1.output == \"heads\", name=\"heads\"):\n", " print_op(msg=item.cats)\n", "\n", " with dsl.Condition(flip1.output == \"tails\", name=\"tails\"):\n", " print_op(msg=item.dogs)\n", "\n", " with dsl.ParallelFor(json_string) as item:\n", " with dsl.Condition(item.snakes == \"boa\", name=\"snakes\"):\n", " print_op(msg=item.snakes)\n", " print_op(msg=item.lizards)\n", " print_op(msg=item.bunnies)\n", "\n", " # it is possible to access sub-items\n", " with dsl.ParallelFor(json_string) as item:\n", " with dsl.ParallelFor(item.bunnies) as item_bunnies:\n", " print_op(msg=item_bunnies.cottontail)" ] }, { "cell_type": "markdown", "metadata": { "id": "compile_pipeline" }, "source": [ "## Compile the pipeline\n", "\n", "Next, compile the pipeline." ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "compile_pipeline" }, "outputs": [], "source": [ "compiler.Compiler().compile(\n", " pipeline_func=pipeline, package_path=\"control_pipeline.yaml\"\n", ")" ] }, { "cell_type": "markdown", "metadata": { "id": "run_pipeline:control" }, "source": [ "## Run the pipeline\n", "\n", "Next, run the pipeline." ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "run_pipeline:control" }, "outputs": [], "source": [ "DISPLAY_NAME = \"control\"\n", "\n", "job = aiplatform.PipelineJob(\n", " display_name=DISPLAY_NAME,\n", " template_path=\"control_pipeline.yaml\",\n", " pipeline_root=PIPELINE_ROOT,\n", ")\n", "\n", "job.run()" ] }, { "cell_type": "markdown", "metadata": { "id": "view_pipeline_run:control" }, "source": [ "Click on the generated link to see your run in the Cloud Console.\n", "\n", "<!-- It should look something like this as it is running:\n", "\n", "<a href=\"https://storage.googleapis.com/amy-jo/images/mp/automl_tabular_classif.png\" target=\"_blank\"><img src=\"https://storage.googleapis.com/amy-jo/images/mp/automl_tabular_classif.png\" width=\"40%\"/></a> -->\n", "\n", "In the Google Cloud Console, you can click on the pipeline DAG nodes to expand or collapse them. Here's a partially-expanded view of the DAG (click image to see larger version).\n", "\n", "<a href=\"https://storage.googleapis.com/amy-jo/images/mp/control_flow_dag.png\" target=\"_blank\"><img src=\"https://storage.googleapis.com/amy-jo/images/mp/control_flow_dag.png\" width=\"95%\"/></a>\n", "\n", "You can see, for example, that the 'heads' condition passed, and the 'tails' condition failed as expected." ] }, { "cell_type": "markdown", "metadata": { "id": "cleanup:pipelines" }, "source": [ "# Cleaning up\n", "\n", "To clean up all Google Cloud resources used in this project, you can [delete the Google Cloud\n", "project](https://cloud.google.com/resource-manager/docs/creating-managing-projects#shutting_down_projects) you used for the tutorial.\n", "\n", "Otherwise, you can delete the individual resources you created in this tutorial." ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "cleanup:pipelines" }, "outputs": [], "source": [ "delete_bucket = False\n", "\n", "# Delete the pipeline job\n", "job.delete()\n", "\n", "# Delete the locally generated files\n", "! rm control_pipeline.yaml\n", "\n", "if delete_bucket:\n", " ! gsutil rm -r $BUCKET_URI" ] } ], "metadata": { "colab": { "name": "control_flow_kfp.ipynb", "toc_visible": true }, "kernelspec": { "display_name": "Python 3", "name": "python3" } }, "nbformat": 4, "nbformat_minor": 0 }