notebooks/official/pipelines/pipelines_intro_kfp.ipynb (1,027 lines of code) (raw):
{
"cells": [
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"id": "copyright"
},
"outputs": [],
"source": [
"# Copyright 2022 Google LLC\n",
"#\n",
"# Licensed under the Apache License, Version 2.0 (the \"License\");\n",
"# you may not use this file except in compliance with the License.\n",
"# You may obtain a copy of the License at\n",
"#\n",
"# https://www.apache.org/licenses/LICENSE-2.0\n",
"#\n",
"# Unless required by applicable law or agreed to in writing, software\n",
"# distributed under the License is distributed on an \"AS IS\" BASIS,\n",
"# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\n",
"# See the License for the specific language governing permissions and\n",
"# limitations under the License."
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "title:generic"
},
"source": [
"# Vertex AI Pipelines: Pipelines introduction for KFP\n",
"\n",
"<table align=\"left\">\n",
" <td style=\"text-align: center\">\n",
" <a href=\"https://colab.research.google.com/github/GoogleCloudPlatform/vertex-ai-samples/blob/main/notebooks/official/pipelines/pipelines_intro_kfp.ipynb\">\n",
" <img src=\"https://cloud.google.com/ml-engine/images/colab-logo-32px.png\" alt=\"Google Colaboratory logo\"><br> Open in Colab\n",
" </a>\n",
" </td>\n",
" <td style=\"text-align: center\">\n",
" <a href=\"https://console.cloud.google.com/vertex-ai/colab/import/https:%2F%2Fraw.githubusercontent.com%2FGoogleCloudPlatform%2Fvertex-ai-samples%2Fmain%2Fnotebooks%2Fofficial%2Fpipelines%2Fpipelines_intro_kfp.ipynb\">\n",
" <img width=\"32px\" src=\"https://cloud.google.com/ml-engine/images/colab-enterprise-logo-32px.png\" alt=\"Google Cloud Colab Enterprise logo\"><br> Open in Colab Enterprise\n",
" </a>\n",
" </td> \n",
" <td style=\"text-align: center\">\n",
" <a href=\"https://console.cloud.google.com/vertex-ai/workbench/deploy-notebook?download_url=https://raw.githubusercontent.com/GoogleCloudPlatform/vertex-ai-samples/main/notebooks/official/pipelines/pipelines_intro_kfp.ipynb\">\n",
" <img src=\"https://lh3.googleusercontent.com/UiNooY4LUgW_oTvpsNhPpQzsstV5W8F7rYgxgGBD85cWJoLmrOzhVs_ksK_vgx40SHs7jCqkTkCk=e14-rj-sc0xffffff-h130-w32\" alt=\"Vertex AI logo\"><br> Open in Workbench\n",
" </a>\n",
" </td>\n",
" <td style=\"text-align: center\">\n",
" <a href=\"https://github.com/GoogleCloudPlatform/vertex-ai-samples/blob/main/notebooks/official/pipelines/pipelines_intro_kfp.ipynb\">\n",
" <img src=\"https://cloud.google.com/ml-engine/images/github-logo-32px.png\" alt=\"GitHub logo\"><br> View on GitHub\n",
" </a>\n",
" </td>\n",
"</table>"
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "overview:pipelines,intro"
},
"source": [
"## Overview\n",
"\n",
"This notebook provides an introduction for using [Vertex AI Pipelines](https://cloud.google.com/vertex-ai/docs/pipelines) with [the Kubeflow Pipelines (KFP) SDK](https://www.kubeflow.org/docs/components/pipelines/).\n",
"\n",
"Learn more about [Vertex AI Pipelines](https://cloud.google.com/vertex-ai/docs/pipelines/introduction)."
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "objective:pipelines,intro"
},
"source": [
"### Objective\n",
"\n",
"In this tutorial, you learn how to use the KFP SDK for Python to build pipelines that generate evaluation metrics.\n",
"\n",
"This tutorial uses the following Vertex AI services:\n",
"\n",
"- Vertex AI Pipelines\n",
"\n",
"The steps performed include:\n",
"\n",
"- Define and compile a Vertex AI pipeline.\n",
"- Specify which service account to use for a pipeline run.\n",
"- Run the pipeline using Vertex AI SDK for Python and REST API."
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "costs:functions,scheduler"
},
"source": [
"### Costs\n",
"\n",
"This tutorial uses billable components of Google Cloud:\n",
"\n",
"* Vertex AI\n",
"* Cloud Storage\n",
"\n",
"Learn about [Vertex AI pricing](https://cloud.google.com/vertex-ai/pricing),\n",
"[Cloud Storage pricing](https://cloud.google.com/storage/pricing),\n",
"and use the [Pricing\n",
"Calculator](https://cloud.google.com/products/calculator/)\n",
"to generate a cost estimate based on your projected usage."
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "d0be1c1c229a"
},
"source": [
"## Get started"
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "install_aip:mbsdk"
},
"source": [
"### Install Vertex AI SDK for Python and other required packages"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"id": "1fd00fa70a2a"
},
"outputs": [],
"source": [
"# Install the packages\n",
"! pip3 install --upgrade google-cloud-aiplatform \\\n",
" google-cloud-storage \\\n",
" kfp \\\n",
" google-cloud-pipeline-components"
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "blGlVGFYW9Pt"
},
"source": [
"### Restart runtime (Colab only)\n",
"\n",
"To use the newly installed packages, you must restart the runtime on Google Colab."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"id": "0JrvuK6LUYnQ"
},
"outputs": [],
"source": [
"import sys\n",
"\n",
"if \"google.colab\" in sys.modules:\n",
"\n",
" import IPython\n",
"\n",
" app = IPython.Application.instance()\n",
" app.kernel.do_shutdown(True)"
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "ffcde4d56c00"
},
"source": [
"<div class=\"alert alert-block alert-warning\">\n",
"<b>⚠️ The kernel is going to restart. Wait until it's finished before continuing to the next step. ⚠️</b>\n",
"</div>\n"
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "check_versions"
},
"source": [
"Check the versions of the packages you installed. The KFP SDK version should be >=1.6."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"id": "check_versions:kfp"
},
"outputs": [],
"source": [
"! python3 -c \"import kfp; print('KFP SDK version: {}'.format(kfp.__version__))\""
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "5dccb1c8feb6"
},
"source": [
"### Authenticate your notebook environment (Colab only)\n",
"\n",
"Authenticate your environment on Google Colab.\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"id": "cc7251520a07"
},
"outputs": [],
"source": [
"import sys\n",
"\n",
"if \"google.colab\" in sys.modules:\n",
"\n",
" from google.colab import auth\n",
"\n",
" auth.authenticate_user()"
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "project_id"
},
"source": [
"### Set Google Cloud project information \n",
"\n",
"Learn more about [setting up a project and a development environment](https://cloud.google.com/vertex-ai/docs/start/cloud-environment)."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"id": "3c8049930470"
},
"outputs": [],
"source": [
"PROJECT_ID = \"[your-project-id]\" # @param {type:\"string\"}\n",
"LOCATION = \"us-central1\" # @param {type:\"string\"}"
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "bucket:custom"
},
"source": [
"### Create a Cloud Storage bucket\n",
"\n",
"Create a storage bucket to store intermediate artifacts such as datasets."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"id": "bucket"
},
"outputs": [],
"source": [
"BUCKET_URI = f\"gs://your-bucket-name-{PROJECT_ID}-unique\" # @param {type:\"string\"}"
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "create_bucket"
},
"source": [
"**If your bucket doesn't already exist**: Run the following cell to create your Cloud Storage bucket."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"id": "Oz8J0vmSlugt"
},
"outputs": [],
"source": [
"! gsutil mb -l {LOCATION} -p {PROJECT_ID} {BUCKET_URI}"
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "set_service_account:pipelines"
},
"source": [
"#### Set service account access for Vertex AI Pipelines\n",
"\n",
"Run the following commands to grant your service account access to read and write pipeline artifacts in the bucket that you created in the previous step. You only need to run this step once per service account."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"id": "77b01a1fdbb4"
},
"outputs": [],
"source": [
"SERVICE_ACCOUNT = \"[your-service-account]\" # @param {type:\"string\"}"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"id": "f936bebda2d4"
},
"outputs": [],
"source": [
"import sys\n",
"\n",
"IS_COLAB = \"google.colab\" in sys.modules\n",
"if (\n",
" SERVICE_ACCOUNT == \"\"\n",
" or SERVICE_ACCOUNT is None\n",
" or SERVICE_ACCOUNT == \"[your-service-account]\"\n",
"):\n",
" # Get your service account from gcloud\n",
" if not IS_COLAB:\n",
" shell_output = !gcloud auth list 2>/dev/null\n",
" SERVICE_ACCOUNT = shell_output[2].replace(\"*\", \"\").strip()\n",
"\n",
" else: # IS_COLAB:\n",
" shell_output = ! gcloud projects describe $PROJECT_ID\n",
" project_number = shell_output[-1].split(\":\")[1].strip().replace(\"'\", \"\")\n",
" SERVICE_ACCOUNT = f\"{project_number}-compute@developer.gserviceaccount.com\"\n",
"\n",
" print(\"Service Account:\", SERVICE_ACCOUNT)"
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "2b7704b6936c"
},
"source": [
"Grant [*Storage Object Creator*](https://cloud.google.com/iam/docs/understanding-roles#storage.objectCreator) and [*Storage Object Viewer*](https://cloud.google.com/iam/docs/understanding-roles#storage.objectViewer) roles to your service account."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"id": "0QNsyzEF2Ou4"
},
"outputs": [],
"source": [
"! gsutil iam ch serviceAccount:{SERVICE_ACCOUNT}:roles/storage.objectCreator $BUCKET_URI\n",
"\n",
"! gsutil iam ch serviceAccount:{SERVICE_ACCOUNT}:roles/storage.objectViewer $BUCKET_URI"
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "setup_vars"
},
"source": [
"### Import the required libraries"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"id": "import_aip:mbsdk"
},
"outputs": [],
"source": [
"import json\n",
"from typing import NamedTuple\n",
"\n",
"from google.cloud import aiplatform\n",
"from kfp import compiler, dsl\n",
"from kfp.dsl import component"
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "ce2821c0912c"
},
"source": [
"### Initialize Vertex AI SDK for Python\n",
"\n",
"To get started using Vertex AI, you must [enable the Vertex AI API](https://console.cloud.google.com/flows/enableapi?apiid=aiplatform.googleapis.com). "
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"id": "d5906c0d5985"
},
"outputs": [],
"source": [
"aiplatform.init(project=PROJECT_ID, location=LOCATION, staging_bucket=BUCKET_URI)"
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "aip_constants:endpoint"
},
"source": [
"#### Vertex AI constants\n",
"\n",
"Setup up the following constants for Vertex AI:\n",
"\n",
"- `API_ENDPOINT`: The Vertex AI API-service endpoint for dataset, model, job, pipeline and endpoint services.\n",
"- `PIPELINE_ROOT`: Cloud Storage location which is treated as the root output directory of the pipeline."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"id": "TnO8gVBb2Ou4"
},
"outputs": [],
"source": [
"# API service endpoint\n",
"API_ENDPOINT = f\"{LOCATION}-aiplatform.googleapis.com\"\n",
"# Pipelne root dir\n",
"PIPELINE_ROOT = f\"{BUCKET_URI}/pipeline_root/intro\""
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "define_component:hello_world"
},
"source": [
"## Define Python function-based pipeline components\n",
"\n",
"In this tutorial, you define a simple pipeline that has three steps, where each step is defined as a component.\n",
"\n",
"### Define *hello world* component\n",
"\n",
"First, define a component based on a very simple Python function. It takes a string input parameter and returns the value as output.\n",
"\n",
"Note the usage of `@component` decorator, which compiles the function to a KFP component when evaluated. The below example specifies a base image for the component (`python:3.9`), and a component YAML file, `hw.yaml`. The compiled component specification is written to the YAML file. (The default base image is `python:3.7`, which works too)."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"id": "GjJhJUID2Ou6"
},
"outputs": [],
"source": [
"@component(base_image=\"python:3.9\")\n",
"def hello_world(text: str) -> str:\n",
" print(text)\n",
" return text\n",
"\n",
"\n",
"compiler.Compiler().compile(hello_world, \"hw.yaml\")"
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "SWcIXuxR2Ou6"
},
"source": [
"As you see below, compilation of this component creates a [task factory function](https://www.kubeflow.org/docs/components/pipelines/sdk/python-function-components/)—called `hello_world`— that you can use in defining a pipeline step.\n",
"\n",
"While not shown here, if you want to share this component definition, or use it in another context, you can load it from its yaml file as below:\n",
"\n",
"`hello_world_op = components.load_component_from_file('./hw.yaml')`\n",
"\n",
"You can also use the `load_component_from_url` method, if your component yaml file is stored online. (For GitHub URLs, load the 'raw' file.)"
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "define_component:two_outputs"
},
"source": [
"### Define *two outputs* component\n",
"\n",
"The first component below i.e., `two_outputs`, demonstrates installing a package. In this case, the `google-cloud-storage` package is installed. Alternatively, you can specify a base image that includes the necessary installations.\n",
"\n",
"**Note:** The component function doesn't actually use the package.\n",
"\n",
"The `two_outputs` component returns two named outputs."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"id": "U4Yv33su2Ou6"
},
"outputs": [],
"source": [
"@component(packages_to_install=[\"google-cloud-storage\"])\n",
"def two_outputs(\n",
" text: str,\n",
") -> NamedTuple(\n",
" \"Outputs\",\n",
" [\n",
" (\"output_one\", str), # Return parameters\n",
" (\"output_two\", str),\n",
" ],\n",
"):\n",
" # the import is not actually used for this simple example, but the import\n",
" # is successful, as it was included in the `packages_to_install` list.\n",
" from google.cloud import storage # noqa: F401\n",
"\n",
" o1 = f\"output one from text: {text}\"\n",
" o2 = f\"output two from text: {text}\"\n",
" print(\"output one: {}; output_two: {}\".format(o1, o2))\n",
" return (o1, o2)"
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "define_component:consumer"
},
"source": [
"### Define *consumer* component\n",
"\n",
"The third component, `consumer`, takes three string inputs, prints them and returns them as the output."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"id": "bu8XvOj82Ou6"
},
"outputs": [],
"source": [
"@component\n",
"def consumer(text1: str, text2: str, text3: str) -> str:\n",
" print(f\"text1: {text1}; text2: {text2}; text3: {text3}\")\n",
" return f\"text1: {text1}; text2: {text2}; text3: {text3}\""
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "define_pipeline:intro"
},
"source": [
"### Define a pipeline that uses the components\n",
"\n",
"Next, define a pipeline that uses the above three components.\n",
"\n",
"By evaluating the component definitions above, you've created task factory functions that are used in the pipeline definition for creating the pipeline steps.\n",
"\n",
"The pipeline takes an input parameter, and passes that parameter as an argument to the first two pipeline steps (`hw_task` and `two_outputs_task`).\n",
"\n",
"Then, the third pipeline step (`consumer_task`) consumes the outputs of the first and second steps. Because the `hello_world` component definition just returns one unnamed output, you refer to it as `hw_task.output`. The `two_outputs` task returns two named outputs, which you access as `two_outputs_task.outputs[\"<output_name>\"]`.\n",
"\n",
"**Note:** In the `@dsl.pipeline` decorator, you define `PIPELINE_ROOT` as the Cloud Storage path that is used as root folder. You can choose to skip it, but you have to provide it when creating the pipeline run."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"id": "CV5dRAeJ2Ou7"
},
"outputs": [],
"source": [
"@dsl.pipeline(\n",
" name=\"intro-pipeline-unique\",\n",
" description=\"A simple intro pipeline\",\n",
" pipeline_root=PIPELINE_ROOT,\n",
")\n",
"def pipeline(text: str = \"hi there\"):\n",
" hw_task = hello_world(text=text)\n",
" two_outputs_task = two_outputs(text=text)\n",
" consumer_task = consumer( # noqa: F841\n",
" text1=hw_task.output,\n",
" text2=two_outputs_task.outputs[\"output_one\"],\n",
" text3=two_outputs_task.outputs[\"output_two\"],\n",
" )"
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "compile_pipeline"
},
"source": [
"## Compile the pipeline\n",
"\n",
"Next, compile the pipeline to a JSON file.\n",
"\n",
"**Note:** You can also compile the pipeline to a YAML file but some REST functionality may not work while using the file in REST API."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"id": "VP_JJ9Oe2Ou7"
},
"outputs": [],
"source": [
"compiler.Compiler().compile(pipeline_func=pipeline, package_path=\"intro_pipeline.json\")"
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "run_pipeline:intro"
},
"source": [
"## Run the pipeline\n",
"\n",
"Now, run the pipeline."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"id": "sjxaBix_2Ou7"
},
"outputs": [],
"source": [
"DISPLAY_NAME = \"intro_pipeline_job_unique\"\n",
"\n",
"job = aiplatform.PipelineJob(\n",
" display_name=DISPLAY_NAME,\n",
" template_path=\"intro_pipeline.json\",\n",
" pipeline_root=PIPELINE_ROOT,\n",
")\n",
"\n",
"job.run()"
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "view_pipeline_run:intro"
},
"source": [
"Click on the generated link to see your run in the Cloud Console.\n",
"\n",
"<!-- It should look something like this as it runs:\n",
"\n",
"<a href=\"https://storage.googleapis.com/amy-jo/images/mp/automl_tabular_classif.png\" target=\"_blank\"><img src=\"https://storage.googleapis.com/amy-jo/images/mp/automl_tabular_classif.png\" width=\"40%\"/></a> -->\n",
"\n",
"In the UI, many of the pipeline DAG nodes expand or collapse when you click on them. Here is a partially-expanded view of the DAG (click image to see larger version).\n",
"\n",
"<a href=\"https://storage.googleapis.com/amy-jo/images/mp/intro_pipeline.png\" target=\"_blank\"><img src=\"https://storage.googleapis.com/amy-jo/images/mp/intro_pipeline.png\" width=\"60%\"/></a>"
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "d428cc8803e7"
},
"source": [
"### Delete the pipeline job\n",
"\n",
"You can delete the pipeline job using the `delete()` method."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"id": "97802d9432e7"
},
"outputs": [],
"source": [
"job.delete()"
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "run_pipeline_service_account"
},
"source": [
"## Specifying a service account to use for a pipeline run\n",
"\n",
"By default, the [service account](https://cloud.google.com/iam/docs/service-accounts) used for your pipeline run is your [default compute engine service account](https://cloud.google.com/compute/docs/access/service-accounts#default_service_account).\n",
"However, you might want to run pipelines with permissions to access different roles than those configured for your default service account. For example, you may need to use a more restricted set of permissions.\n",
"\n",
"\n",
"Once your service account is created and configured, pass it as an argument to the `create_run_from_job_spec` method. The pipeline job runs with the permissions of the given service account.\n",
"\n",
"Learn more about [creating and configuring a service account to work with Vertex AI Pipelines](https://cloud.google.com/vertex-ai/docs/pipelines/configure-project#service-account)."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"id": "CHcFxq742Ou9"
},
"outputs": [],
"source": [
"DISPLAY_NAME = \"intro_pipeline_job_svc_acc\"\n",
"\n",
"job = aiplatform.PipelineJob(\n",
" display_name=DISPLAY_NAME,\n",
" template_path=\"intro_pipeline.json\",\n",
" pipeline_root=PIPELINE_ROOT,\n",
")\n",
"\n",
"job.run(\n",
" service_account=SERVICE_ACCOUNT\n",
") # <-- CHANGE to use non-default service account"
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "5219b22d12ed"
},
"source": [
"### Delete the pipeline job\n",
"\n",
"You can delete the pipeline job using the `delete()` method."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"id": "b8d17de94944"
},
"outputs": [],
"source": [
"job.delete()"
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "run_pipeline_caching"
},
"source": [
"## Pipeline step caching\n",
"\n",
"By default, pipeline step caching is enabled. This means that the results of previous step executions are reused when possible.\n",
"\n",
"If you want to disable caching for a pipeline run, you can set the `enable_caching` parameter as **False** when creating the `PipelineJob` object. \n",
"\n",
"Try submitting the pipeline job twice: once with `enable_caching` set to **True**, and the other time with `enable_caching` set to **False**."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"id": "VrM5wqjh2Ou9"
},
"outputs": [],
"source": [
"job = aiplatform.PipelineJob(\n",
" display_name=\"intro_pipeline_job_cached_unique\",\n",
" template_path=\"intro_pipeline.json\",\n",
" enable_caching=False,\n",
")\n",
"\n",
"job.run()"
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "37e2b84a0c0d"
},
"source": [
"### Delete the pipeline job\n",
"\n",
"You can delete the pipeline job using the `delete()` method."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"id": "c5926e7eaad8"
},
"outputs": [],
"source": [
"job.delete()"
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "pipelines_rest"
},
"source": [
"## Using the Pipelines REST API\n",
"\n",
"At times you may want to use the REST API instead of the Python KFP SDK. Below are examples of how to do that.\n",
"\n",
"Where a command requires a pipeline ID, you can get that data from the \"Run\" column in the pipelines list as shown below, as well as from the \"details\" page for a given pipeline. You can see the pipeline details using the list method for pipeline jobs API.\n",
"\n",
"<a href=\"https://storage.googleapis.com/amy-jo/images/mp/pipeline_id.png\" target=\"_blank\"><img src=\"https://storage.googleapis.com/amy-jo/images/mp/pipeline_id.png\" width=\"80%\"/></a>"
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "pipelines_rest_list"
},
"source": [
"### List pipeline jobs\n",
"\n",
"**Note:** This request may generate a large response if you have many pipeline runs."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"id": "1nJSqELi2Ou-"
},
"outputs": [],
"source": [
"! curl -X GET -H \"Authorization: Bearer $(gcloud auth print-access-token)\" https://{API_ENDPOINT}/v1beta1/projects/{PROJECT_ID}/locations/{LOCATION}/pipelineJobs"
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "pipelines_rest_create"
},
"source": [
"### Create a pipeline job\n",
"\n",
"To send a REST request for pipeline job creation, you need to include the pipeline job specification details. \n",
"\n",
"For this reason, load the previously compiled pipeline specification details to a json object and include it in your pipeline configuration."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"id": "feca6b12a940"
},
"outputs": [],
"source": [
"# Load the pipeline specification\n",
"with open(\"intro_pipeline.json\") as fp:\n",
" pipeline_job_spec = json.load(fp)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"id": "c4414fd14e9f"
},
"outputs": [],
"source": [
"# Specify the pipeline configuration details\n",
"pipeline_config = {\n",
" \"displayName\": \"intro-pipeline-rest-api\",\n",
" \"runtimeConfig\": {\n",
" \"gcsOutputDirectory\": PIPELINE_ROOT,\n",
" },\n",
" \"pipelineSpec\": pipeline_job_spec,\n",
"}\n",
"\n",
"# Save the configuration to a json file\n",
"with open(\"pipeline_config.json\", \"w\") as fp:\n",
" json.dump(pipeline_config, fp)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"id": "iEUcY8aQ2Ou-"
},
"outputs": [],
"source": [
"# Set a job ID (optional)\n",
"PIPELINE_RUN_ID = \"intro-pipeline-job-unique\"\n",
"\n",
"# Send the job creation request using the configuration payload\n",
"output = ! curl -X POST -H \"Authorization: Bearer $(gcloud auth print-access-token)\" -H \"Content-Type: application/json; charset=utf-8\" https://{API_ENDPOINT}/v1beta1/projects/{PROJECT_ID}/locations/{LOCATION}/pipelineJobs?pipelineJobId={PIPELINE_RUN_ID} --data \"@pipeline_config.json\"\n",
"\n",
"# In case you didn't use a pre-defined PipelineJobId, Vertex AI\n",
"# generates one automatically. In such a case, use the following\n",
"# commented code to retrieve the generated job id.\n",
"# output_json = json.loads(\" \".join(output))\n",
"# PIPELINE_RUN_ID = output_json['name'].split(\"/\")[-1]\n",
"# print(PIPELINE_RUN_ID)"
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "pipelines_rest_get"
},
"source": [
"### Get pipeline job details using ID"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"id": "D-QEplm82Ou-"
},
"outputs": [],
"source": [
"! curl -X GET -H \"Authorization: Bearer $(gcloud auth print-access-token)\" https://{API_ENDPOINT}/v1beta1/projects/{PROJECT_ID}/locations/{LOCATION}/pipelineJobs/{PIPELINE_RUN_ID}"
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "pipelines_rest_cancel"
},
"source": [
"### Cancel pipeline job using ID\n",
"\n",
"**Note:** If your pipeline has already executed successfully before you reach this step, you encounter an error response stating the same."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"id": "YlJeXYBK2Ou_"
},
"outputs": [],
"source": [
"! curl -X POST -H \"Authorization: Bearer $(gcloud auth print-access-token)\" https://{API_ENDPOINT}/v1beta1/projects/{PROJECT_ID}/locations/{LOCATION}/pipelineJobs/{PIPELINE_RUN_ID}:cancel"
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "pipelines_rest_delete"
},
"source": [
"### Delete pipeline job using ID"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"id": "WIprXZMc2Ou_"
},
"outputs": [],
"source": [
"! curl -X DELETE -H \"Authorization: Bearer $(gcloud auth print-access-token)\" https://{API_ENDPOINT}/v1beta1/projects/{PROJECT_ID}/locations/{LOCATION}/pipelineJobs/{PIPELINE_RUN_ID}"
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "cleanup:pipelines"
},
"source": [
"# Cleaning up\n",
"\n",
"To clean up all Google Cloud resources used in this project, you can [delete the Google Cloud\n",
"project](https://cloud.google.com/resource-manager/docs/creating-managing-projects#shutting_down_projects) you used for the tutorial.\n",
"\n",
"Otherwise, you can delete the individual resources you created in this tutorial."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"id": "-eXdbQCo2Ou_"
},
"outputs": [],
"source": [
"# Delete the Cloud Storage bucket\n",
"delete_bucket = False # Set True for deletion\n",
"\n",
"if delete_bucket:\n",
" ! gsutil rm -r $BUCKET_URI\n",
"\n",
"# Delete the locally generated files\n",
"! rm intro_pipeline.json\n",
"! rm pipeline_config.json"
]
}
],
"metadata": {
"colab": {
"name": "pipelines_intro_kfp.ipynb",
"toc_visible": true
},
"kernelspec": {
"display_name": "Python 3",
"name": "python3"
}
},
"nbformat": 4,
"nbformat_minor": 0
}