In [None]:
# Copyright 2022 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

# Vertex AI Pipelines: Pipelines introduction for KFP

<table align="left">
  <td style="text-align: center">
    <a href="https://colab.research.google.com/github/GoogleCloudPlatform/vertex-ai-samples/blob/main/notebooks/official/pipelines/pipelines_intro_kfp.ipynb">
      <img src="https://cloud.google.com/ml-engine/images/colab-logo-32px.png" alt="Google Colaboratory logo"><br> Open in Colab
    </a>
  </td>
  <td style="text-align: center">
    <a href="https://console.cloud.google.com/vertex-ai/colab/import/https:%2F%2Fraw.githubusercontent.com%2FGoogleCloudPlatform%2Fvertex-ai-samples%2Fmain%2Fnotebooks%2Fofficial%2Fpipelines%2Fpipelines_intro_kfp.ipynb">
      <img width="32px" src="https://cloud.google.com/ml-engine/images/colab-enterprise-logo-32px.png" alt="Google Cloud Colab Enterprise logo"><br> Open in Colab Enterprise
    </a>
  </td>    
  <td style="text-align: center">
    <a href="https://console.cloud.google.com/vertex-ai/workbench/deploy-notebook?download_url=https://raw.githubusercontent.com/GoogleCloudPlatform/vertex-ai-samples/main/notebooks/official/pipelines/pipelines_intro_kfp.ipynb">
      <img src="https://lh3.googleusercontent.com/UiNooY4LUgW_oTvpsNhPpQzsstV5W8F7rYgxgGBD85cWJoLmrOzhVs_ksK_vgx40SHs7jCqkTkCk=e14-rj-sc0xffffff-h130-w32" alt="Vertex AI logo"><br> Open in Workbench
    </a>
  </td>
  <td style="text-align: center">
    <a href="https://github.com/GoogleCloudPlatform/vertex-ai-samples/blob/main/notebooks/official/pipelines/pipelines_intro_kfp.ipynb">
      <img src="https://cloud.google.com/ml-engine/images/github-logo-32px.png" alt="GitHub logo"><br> View on GitHub
    </a>
  </td>
</table>

## Overview

This notebook provides an introduction for using [Vertex AI Pipelines](https://cloud.google.com/vertex-ai/docs/pipelines) with [the Kubeflow Pipelines (KFP) SDK](https://www.kubeflow.org/docs/components/pipelines/).

Learn more about [Vertex AI Pipelines](https://cloud.google.com/vertex-ai/docs/pipelines/introduction).

### Objective

In this tutorial, you learn how to use the KFP SDK for Python to build pipelines that generate evaluation metrics.

This tutorial uses the following Vertex AI services:

- Vertex AI Pipelines

The steps performed include:

- Define and compile a Vertex AI pipeline.
- Specify which service account to use for a pipeline run.
- Run the pipeline using Vertex AI SDK for Python and REST API.

### Costs

This tutorial uses billable components of Google Cloud:

* Vertex AI
* Cloud Storage

Learn about [Vertex AI pricing](https://cloud.google.com/vertex-ai/pricing),
[Cloud Storage pricing](https://cloud.google.com/storage/pricing),
and use the [Pricing
Calculator](https://cloud.google.com/products/calculator/)
to generate a cost estimate based on your projected usage.

## Get started

### Install Vertex AI SDK for Python and other required packages

In [None]:
# Install the packages
! pip3 install --upgrade google-cloud-aiplatform \
                         google-cloud-storage \
                         kfp \
                         google-cloud-pipeline-components

### Restart runtime (Colab only)

To use the newly installed packages, you must restart the runtime on Google Colab.

In [None]:
import sys

if "google.colab" in sys.modules:

    import IPython

    app = IPython.Application.instance()
    app.kernel.do_shutdown(True)

<div class="alert alert-block alert-warning">
<b>⚠️ The kernel is going to restart. Wait until it's finished before continuing to the next step. ⚠️</b>
</div>


Check the versions of the packages you installed.  The KFP SDK version should be >=1.6.

In [None]:
! python3 -c "import kfp; print('KFP SDK version: {}'.format(kfp.__version__))"

### Authenticate your notebook environment (Colab only)

Authenticate your environment on Google Colab.


In [None]:
import sys

if "google.colab" in sys.modules:

    from google.colab import auth

    auth.authenticate_user()

### Set Google Cloud project information 

Learn more about [setting up a project and a development environment](https://cloud.google.com/vertex-ai/docs/start/cloud-environment).

In [None]:
PROJECT_ID = "[your-project-id]"  # @param {type:"string"}
LOCATION = "us-central1"  # @param {type:"string"}

### Create a Cloud Storage bucket

Create a storage bucket to store intermediate artifacts such as datasets.

In [None]:
BUCKET_URI = f"gs://your-bucket-name-{PROJECT_ID}-unique"  # @param {type:"string"}

**If your bucket doesn't already exist**: Run the following cell to create your Cloud Storage bucket.

In [None]:
! gsutil mb -l {LOCATION} -p {PROJECT_ID} {BUCKET_URI}

#### Set service account access for Vertex AI Pipelines

Run the following commands to grant your service account access to read and write pipeline artifacts in the bucket that you created in the previous step. You only need to run this step once per service account.

In [None]:
SERVICE_ACCOUNT = "[your-service-account]"  # @param {type:"string"}

In [None]:
import sys

IS_COLAB = "google.colab" in sys.modules
if (
    SERVICE_ACCOUNT == ""
    or SERVICE_ACCOUNT is None
    or SERVICE_ACCOUNT == "[your-service-account]"
):
    # Get your service account from gcloud
    if not IS_COLAB:
        shell_output = !gcloud auth list 2>/dev/null
        SERVICE_ACCOUNT = shell_output[2].replace("*", "").strip()

    else:  # IS_COLAB:
        shell_output = ! gcloud projects describe  $PROJECT_ID
        project_number = shell_output[-1].split(":")[1].strip().replace("'", "")
        SERVICE_ACCOUNT = f"{project_number}-compute@developer.gserviceaccount.com"

    print("Service Account:", SERVICE_ACCOUNT)

Grant [*Storage Object Creator*](https://cloud.google.com/iam/docs/understanding-roles#storage.objectCreator) and [*Storage Object Viewer*](https://cloud.google.com/iam/docs/understanding-roles#storage.objectViewer) roles to your service account.

In [None]:
! gsutil iam ch serviceAccount:{SERVICE_ACCOUNT}:roles/storage.objectCreator $BUCKET_URI

! gsutil iam ch serviceAccount:{SERVICE_ACCOUNT}:roles/storage.objectViewer $BUCKET_URI

### Import the required libraries

In [None]:
import json
from typing import NamedTuple

from google.cloud import aiplatform
from kfp import compiler, dsl
from kfp.dsl import component

### Initialize Vertex AI SDK for Python

To get started using Vertex AI, you must [enable the Vertex AI API](https://console.cloud.google.com/flows/enableapi?apiid=aiplatform.googleapis.com). 

In [None]:
aiplatform.init(project=PROJECT_ID, location=LOCATION, staging_bucket=BUCKET_URI)

#### Vertex AI constants

Setup up the following constants for Vertex AI:

- `API_ENDPOINT`: The Vertex AI API-service endpoint for dataset, model, job, pipeline and endpoint services.
- `PIPELINE_ROOT`: Cloud Storage location which is treated as the root output directory of the pipeline.

In [None]:
# API service endpoint
API_ENDPOINT = f"{LOCATION}-aiplatform.googleapis.com"
# Pipelne root dir
PIPELINE_ROOT = f"{BUCKET_URI}/pipeline_root/intro"

## Define Python function-based pipeline components

In this tutorial, you define a simple pipeline that has three steps, where each step is defined as a component.

### Define *hello world* component

First, define a component based on a very simple Python function. It takes a string input parameter and returns the value as output.

Note the usage of `@component` decorator, which compiles the function to a KFP component when evaluated. The below example specifies a base image for the component (`python:3.9`), and a component YAML file, `hw.yaml`. The compiled component specification is written to the YAML file.  (The default base image is `python:3.7`, which works too).

In [None]:
@component(base_image="python:3.9")
def hello_world(text: str) -> str:
    print(text)
    return text


compiler.Compiler().compile(hello_world, "hw.yaml")

As you see below, compilation of this component creates a [task factory function](https://www.kubeflow.org/docs/components/pipelines/sdk/python-function-components/)—called `hello_world`— that you can use in defining a pipeline step.

While not shown here, if you want to share this component definition, or use it in another context, you can load it from its yaml file as below:

`hello_world_op = components.load_component_from_file('./hw.yaml')`

You can also use the `load_component_from_url` method, if your component yaml file is stored online. (For GitHub URLs, load the 'raw' file.)

### Define *two outputs* component

The first component below i.e., `two_outputs`, demonstrates installing a package. In this case, the `google-cloud-storage` package is installed. Alternatively, you can specify a base image that includes the necessary installations.

**Note:** The component function doesn't actually use the package.

The `two_outputs` component returns two named outputs.

In [None]:
@component(packages_to_install=["google-cloud-storage"])
def two_outputs(
    text: str,
) -> NamedTuple(
    "Outputs",
    [
        ("output_one", str),  # Return parameters
        ("output_two", str),
    ],
):
    # the import is not actually used for this simple example, but the import
    # is successful, as it was included in the `packages_to_install` list.
    from google.cloud import storage  # noqa: F401

    o1 = f"output one from text: {text}"
    o2 = f"output two from text: {text}"
    print("output one: {}; output_two: {}".format(o1, o2))
    return (o1, o2)

### Define *consumer* component

The third component, `consumer`, takes three string inputs, prints them and returns them as the output.

In [None]:
@component
def consumer(text1: str, text2: str, text3: str) -> str:
    print(f"text1: {text1}; text2: {text2}; text3: {text3}")
    return f"text1: {text1}; text2: {text2}; text3: {text3}"

### Define a pipeline that uses the components

Next, define a pipeline that uses the above three components.

By evaluating the component definitions above, you've created task factory functions that are used in the pipeline definition for creating the pipeline steps.

The pipeline takes an input parameter, and passes that parameter as an argument to the first two pipeline steps (`hw_task` and `two_outputs_task`).

Then, the third pipeline step (`consumer_task`) consumes the outputs of the first and second steps.  Because the `hello_world` component definition just returns one unnamed output, you refer to it as `hw_task.output`.  The `two_outputs` task returns two named outputs, which you access as `two_outputs_task.outputs["<output_name>"]`.

**Note:** In the `@dsl.pipeline` decorator, you define `PIPELINE_ROOT` as the Cloud Storage path that is used as root folder.  You can choose to skip it, but you have to provide it when creating the pipeline run.

In [None]:
@dsl.pipeline(
    name="intro-pipeline-unique",
    description="A simple intro pipeline",
    pipeline_root=PIPELINE_ROOT,
)
def pipeline(text: str = "hi there"):
    hw_task = hello_world(text=text)
    two_outputs_task = two_outputs(text=text)
    consumer_task = consumer(  # noqa: F841
        text1=hw_task.output,
        text2=two_outputs_task.outputs["output_one"],
        text3=two_outputs_task.outputs["output_two"],
    )

## Compile the pipeline

Next, compile the pipeline to a JSON file.

**Note:** You can also compile the pipeline to a YAML file but some REST functionality may not work while using the file in REST API.

In [None]:
compiler.Compiler().compile(pipeline_func=pipeline, package_path="intro_pipeline.json")

## Run the pipeline

Now, run the pipeline.

In [None]:
DISPLAY_NAME = "intro_pipeline_job_unique"

job = aiplatform.PipelineJob(
    display_name=DISPLAY_NAME,
    template_path="intro_pipeline.json",
    pipeline_root=PIPELINE_ROOT,
)

job.run()

Click on the generated link to see your run in the Cloud Console.

<!-- It should look something like this as it runs:

<a href="https://storage.googleapis.com/amy-jo/images/mp/automl_tabular_classif.png" target="_blank"><img src="https://storage.googleapis.com/amy-jo/images/mp/automl_tabular_classif.png" width="40%"/></a> -->

In the UI, many of the pipeline DAG nodes expand or collapse when you click on them. Here is a partially-expanded view of the DAG (click image to see larger version).

<a href="https://storage.googleapis.com/amy-jo/images/mp/intro_pipeline.png" target="_blank"><img src="https://storage.googleapis.com/amy-jo/images/mp/intro_pipeline.png" width="60%"/></a>

### Delete the pipeline job

You can delete the pipeline job using the `delete()` method.

In [None]:
job.delete()

## Specifying a service account to use for a pipeline run

By default, the [service account](https://cloud.google.com/iam/docs/service-accounts) used for your pipeline run is your [default compute engine service account](https://cloud.google.com/compute/docs/access/service-accounts#default_service_account).
However, you might want to run pipelines with permissions to access different roles than those configured for your default service account. For example, you may need to use a more restricted set of permissions.


Once your service account is created and configured, pass it as an argument to the `create_run_from_job_spec` method. The pipeline job runs with the permissions of the given service account.

Learn more about [creating and configuring a service account to work with Vertex AI Pipelines](https://cloud.google.com/vertex-ai/docs/pipelines/configure-project#service-account).

In [None]:
DISPLAY_NAME = "intro_pipeline_job_svc_acc"

job = aiplatform.PipelineJob(
    display_name=DISPLAY_NAME,
    template_path="intro_pipeline.json",
    pipeline_root=PIPELINE_ROOT,
)

job.run(
    service_account=SERVICE_ACCOUNT
)  # <-- CHANGE to use non-default service account

### Delete the pipeline job

You can delete the pipeline job using the `delete()` method.

In [None]:
job.delete()

## Pipeline step caching

By default, pipeline step caching is enabled. This means that the results of previous step executions are reused when possible.

If you want to disable caching for a pipeline run, you can set the `enable_caching` parameter as **False** when creating the `PipelineJob` object. 

Try submitting the pipeline job twice: once with `enable_caching` set to **True**, and the other time with `enable_caching` set to **False**.

In [None]:
job = aiplatform.PipelineJob(
    display_name="intro_pipeline_job_cached_unique",
    template_path="intro_pipeline.json",
    enable_caching=False,
)

job.run()

### Delete the pipeline job

You can delete the pipeline job using the `delete()` method.

In [None]:
job.delete()

## Using the Pipelines REST API

At times you may want to use the REST API instead of the Python KFP SDK.  Below are examples of how to do that.

Where a command requires a pipeline ID, you can get that data from the "Run" column in the pipelines list as shown below, as well as from the "details" page for a given pipeline. You can see the pipeline details using the list method for pipeline jobs API.

<a href="https://storage.googleapis.com/amy-jo/images/mp/pipeline_id.png" target="_blank"><img src="https://storage.googleapis.com/amy-jo/images/mp/pipeline_id.png" width="80%"/></a>

### List pipeline jobs

**Note:** This request may generate a large response if you have many pipeline runs.

In [None]:
! curl -X GET -H "Authorization: Bearer $(gcloud auth print-access-token)" https://{API_ENDPOINT}/v1beta1/projects/{PROJECT_ID}/locations/{LOCATION}/pipelineJobs

### Create a pipeline job

To send a REST request for pipeline job creation, you need to include the pipeline job specification details. 

For this reason, load the previously compiled pipeline specification details to a json object and include it in your pipeline configuration.

In [None]:
# Load the pipeline specification
with open("intro_pipeline.json") as fp:
    pipeline_job_spec = json.load(fp)

In [None]:
# Specify the pipeline configuration details
pipeline_config = {
    "displayName": "intro-pipeline-rest-api",
    "runtimeConfig": {
        "gcsOutputDirectory": PIPELINE_ROOT,
    },
    "pipelineSpec": pipeline_job_spec,
}

# Save the configuration to a json file
with open("pipeline_config.json", "w") as fp:
    json.dump(pipeline_config, fp)

In [None]:
# Set a job ID (optional)
PIPELINE_RUN_ID = "intro-pipeline-job-unique"

# Send the job creation request using the configuration payload
output = ! curl -X POST  -H "Authorization: Bearer $(gcloud auth print-access-token)" -H "Content-Type: application/json; charset=utf-8"   https://{API_ENDPOINT}/v1beta1/projects/{PROJECT_ID}/locations/{LOCATION}/pipelineJobs?pipelineJobId={PIPELINE_RUN_ID}  --data "@pipeline_config.json"

# In case you didn't use a pre-defined PipelineJobId, Vertex AI
# generates one automatically. In such a case, use the following
# commented code to retrieve the generated job id.
# output_json = json.loads(" ".join(output))
# PIPELINE_RUN_ID = output_json['name'].split("/")[-1]
# print(PIPELINE_RUN_ID)

### Get pipeline job details using ID

In [None]:
! curl -X GET -H "Authorization: Bearer $(gcloud auth print-access-token)"  https://{API_ENDPOINT}/v1beta1/projects/{PROJECT_ID}/locations/{LOCATION}/pipelineJobs/{PIPELINE_RUN_ID}

### Cancel pipeline job using ID

**Note:** If your pipeline has already executed successfully before you reach this step, you encounter an error response stating the same.

In [None]:
! curl -X POST -H "Authorization: Bearer $(gcloud auth print-access-token)" https://{API_ENDPOINT}/v1beta1/projects/{PROJECT_ID}/locations/{LOCATION}/pipelineJobs/{PIPELINE_RUN_ID}:cancel

### Delete pipeline job using ID

In [None]:
! curl -X DELETE -H "Authorization: Bearer $(gcloud auth print-access-token)"  https://{API_ENDPOINT}/v1beta1/projects/{PROJECT_ID}/locations/{LOCATION}/pipelineJobs/{PIPELINE_RUN_ID}

# Cleaning up

To clean up all Google Cloud resources used in this project, you can [delete the Google Cloud
project](https://cloud.google.com/resource-manager/docs/creating-managing-projects#shutting_down_projects) you used for the tutorial.

Otherwise, you can delete the individual resources you created in this tutorial.

In [None]:
# Delete the Cloud Storage bucket
delete_bucket = False  # Set True for deletion

if delete_bucket:
    ! gsutil rm -r $BUCKET_URI

# Delete the locally generated files
! rm intro_pipeline.json
! rm pipeline_config.json