In [None]:
# Copyright 2024 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

# Vertex AI Pipelines: pipeline control structures using the KFP SDK

<table align="left">
  <td style="text-align: center">
    <a href="https://colab.research.google.com/github/GoogleCloudPlatform/vertex-ai-samples/blob/main/notebooks/official/pipelines/control_flow_kfp.ipynb">
      <img src="https://cloud.google.com/ml-engine/images/colab-logo-32px.png" alt="Google Colaboratory logo"><br> Open in Colab
    </a>
  </td>
  <td style="text-align: center">
    <a href="https://console.cloud.google.com/vertex-ai/colab/import/https:%2F%2Fraw.githubusercontent.com%2FGoogleCloudPlatform%2Fvertex-ai-samples%2Fmain%2Fnotebooks%2Fofficial%2Fpipelines%2Fcontrol_flow_kfp.ipynb">
      <img width="32px" src="https://cloud.google.com/ml-engine/images/colab-enterprise-logo-32px.png" alt="Google Cloud Colab Enterprise logo"><br> Open in Colab Enterprise
    </a>
  </td>    
  <td style="text-align: center">
    <a href="https://console.cloud.google.com/vertex-ai/workbench/deploy-notebook?download_url=https://raw.githubusercontent.com/GoogleCloudPlatform/vertex-ai-samples/main/notebooks/official/pipelines/control_flow_kfp.ipynb">
      <img src="https://lh3.googleusercontent.com/UiNooY4LUgW_oTvpsNhPpQzsstV5W8F7rYgxgGBD85cWJoLmrOzhVs_ksK_vgx40SHs7jCqkTkCk=e14-rj-sc0xffffff-h130-w32" alt="Vertex AI logo"><br> Open in Workbench
    </a>
  </td>
  <td style="text-align: center">
    <a href="https://github.com/GoogleCloudPlatform/vertex-ai-samples/blob/main/notebooks/official/pipelines/control_flow_kfp.ipynb">
      <img src="https://cloud.google.com/ml-engine/images/github-logo-32px.png" alt="GitHub logo"><br> View on GitHub
    </a>
  </td>
</table>

## Overview

This notebook demostrates how to use [the Kubeflow Pipelines (KFP) SDK](https://www.kubeflow.org/docs/components/pipelines/v2/) to build Vertex AI Pipelines using control structures.

Learn more about [Vertex AI Pipelines](https://cloud.google.com/vertex-ai/docs/pipelines/introduction).

### Objective

In this tutorial, you learn how to use the KFP SDK, which uses loops and conditionals including nested examples, to build pipelines.

This tutorial uses the following Google Cloud ML services:

- Vertex AI Pipelines

The steps performed include:

- Create a KFP pipeline using control flow components
- Compile the KFP pipeline
- Execute the KFP pipeline using Vertex AI Pipelines

### Costs

This tutorial uses the following billable components of Google Cloud:

* Vertex AI
* Cloud Storage

Learn about [Vertex AI
pricing](https://cloud.google.com/vertex-ai/pricing) and [Cloud Storage
pricing](https://cloud.google.com/storage/pricing), and use the [Pricing
Calculator](https://cloud.google.com/products/calculator/)
to generate a cost estimate based on your projected usage.

## Get started


### Install Vertex AI SDK for Python and other required packages


In [None]:
! pip3 install --upgrade --quiet google-cloud-aiplatform  \
                                 google-cloud-storage \
                                 kfp \
                                 google-cloud-pipeline-components

### Restart runtime (Colab only)

To use the newly installed packages, you must restart the runtime on Google Colab.

In [None]:
import sys

if "google.colab" in sys.modules:

    import IPython

    app = IPython.Application.instance()
    app.kernel.do_shutdown(True)

<div class="alert alert-block alert-warning">
<b>⚠️ The kernel is going to restart. Wait until it's finished before continuing to the next step. ⚠️</b>
</div>


### Authenticate your notebook environment (Colab only)

Authenticate your environment on Google Colab.


In [None]:
import sys

if "google.colab" in sys.modules:

    from google.colab import auth

    auth.authenticate_user()

### Set Google Cloud project information

To get started using Vertex AI, you must have an existing Google Cloud project. Learn more about [setting up a project and a development environment](https://cloud.google.com/vertex-ai/docs/start/cloud-environment).

In [None]:
PROJECT_ID = "[your-project-id]"  # @param {type:"string"}
LOCATION = "us-central1"  # @param {type:"string"}

### Create a Cloud Storage bucket

Create a storage bucket to store intermediate artifacts such as datasets.

In [None]:
BUCKET_URI = f"gs://your-bucket-name-{PROJECT_ID}-unique"  # @param {type:"string"}

**If your bucket doesn't already exist**: Run the following cell to create your Cloud Storage bucket.

In [None]:
! gsutil mb -l {LOCATION} -p {PROJECT_ID} {BUCKET_URI}

#### Service Account

**If you don't know your service account**, create your service account using the `gcloud` command by executing the second cell below.

In [None]:
SERVICE_ACCOUNT = "[your-service-account]"  # @param {type:"string"}

In [None]:
import sys

IS_COLAB = "google.colab" in sys.modules
if (
    SERVICE_ACCOUNT == ""
    or SERVICE_ACCOUNT is None
    or SERVICE_ACCOUNT == "[your-service-account]"
):
    # Get your service account from gcloud
    if not IS_COLAB:
        shell_output = !gcloud auth list 2>/dev/null
        SERVICE_ACCOUNT = shell_output[2].replace("*", "").strip()

    if IS_COLAB:
        shell_output = ! gcloud projects describe  $PROJECT_ID
        project_number = shell_output[-1].split(":")[1].strip().replace("'", "")
        SERVICE_ACCOUNT = f"{project_number}-compute@developer.gserviceaccount.com"

    print("Service Account:", SERVICE_ACCOUNT)

#### Set service account access for Vertex AI Pipelines

Run the following commands to grant your service account permission to read and write pipeline artifacts in the bucket created in the previous step. You only need to run these once per service account.

In [None]:
! gsutil iam ch serviceAccount:{SERVICE_ACCOUNT}:roles/storage.objectCreator $BUCKET_URI

! gsutil iam ch serviceAccount:{SERVICE_ACCOUNT}:roles/storage.objectViewer $BUCKET_URI

### Import libraries and define constants

In [None]:
import json

from google.cloud import aiplatform
from kfp import compiler, dsl
from kfp.dsl import component

#### Vertex AI Pipelines constants

Setup up the following constants for Vertex AI Pipelines:

In [None]:
PIPELINE_ROOT = "{}/pipeline_root/control".format(BUCKET_URI)

## Initialize Vertex AI SDK for Python

Initialize the Vertex AI SDK for Python for your project and corresponding bucket.

In [None]:
aiplatform.init(project=PROJECT_ID, staging_bucket=BUCKET_URI)

## Define pipeline components

The following example defines three simple pipeline components:

- A component that generates a list of dicts and returns a stringified json.
(**Note**: This component requires an `import json` in the component function definition)
- A component that just prints its input string
- A component that does a 'coin flip' and returns `heads` or `tails`.

In [None]:
@component
def args_generator_op() -> str:
    import json

    return json.dumps(
        [{"cats": "1", "dogs": "2"}, {"cats": "10", "dogs": "20"}],
        sort_keys=True,
    )


@component
def print_op(msg: str):
    print(msg)


@component
def flip_coin_op() -> str:
    """Flip a coin and return heads or tails randomly."""
    import random

    result = "heads" if random.randint(0, 1) == 0 else "tails"
    return result

## Define a pipeline that uses control structures

The following example defines a pipeline component that demonstrates the use of `dsl.Condition` and `dsl.ParallelFor`.

The default value of `json_string` is a nested JSON list converted to a string. According to the pipeline definition, the loop and conditional expressions process this string as a list, and access list items and sub-items.
The same holds for the list output by the `args_generator_op`.

In [None]:
@dsl.pipeline(
    name="control",
    pipeline_root=PIPELINE_ROOT,
)
def pipeline(
    json_string: str = json.dumps(
        [
            {
                "snakes": "anaconda",
                "lizards": "anole",
                "bunnies": [{"cottontail": "bugs"}, {"cottontail": "thumper"}],
            },
            {
                "snakes": "cobra",
                "lizards": "gecko",
                "bunnies": [{"cottontail": "roger"}],
            },
            {
                "snakes": "boa",
                "lizards": "iguana",
                "bunnies": [
                    {"cottontail": "fluffy"},
                    {"fuzzy_lop": "petunia", "cottontail": "peter"},
                ],
            },
        ],
        sort_keys=True,
    )
):

    flip1 = flip_coin_op()

    with dsl.Condition(
        flip1.output != "no-such-result", name="alwaystrue"
    ):  # always true

        args_generator = args_generator_op()
        with dsl.ParallelFor(args_generator.output) as item:
            print_op(msg=json_string)

            with dsl.Condition(flip1.output == "heads", name="heads"):
                print_op(msg=item.cats)

            with dsl.Condition(flip1.output == "tails", name="tails"):
                print_op(msg=item.dogs)

    with dsl.ParallelFor(json_string) as item:
        with dsl.Condition(item.snakes == "boa", name="snakes"):
            print_op(msg=item.snakes)
            print_op(msg=item.lizards)
            print_op(msg=item.bunnies)

    # it is possible to access sub-items
    with dsl.ParallelFor(json_string) as item:
        with dsl.ParallelFor(item.bunnies) as item_bunnies:
            print_op(msg=item_bunnies.cottontail)

## Compile the pipeline

Next, compile the pipeline.

In [None]:
compiler.Compiler().compile(
    pipeline_func=pipeline, package_path="control_pipeline.yaml"
)

## Run the pipeline

Next, run the pipeline.

In [None]:
DISPLAY_NAME = "control"

job = aiplatform.PipelineJob(
    display_name=DISPLAY_NAME,
    template_path="control_pipeline.yaml",
    pipeline_root=PIPELINE_ROOT,
)

job.run()

Click on the generated link to see your run in the Cloud Console.

<!-- It should look something like this as it is running:

<a href="https://storage.googleapis.com/amy-jo/images/mp/automl_tabular_classif.png" target="_blank"><img src="https://storage.googleapis.com/amy-jo/images/mp/automl_tabular_classif.png" width="40%"/></a> -->

In the Google Cloud Console, you can click on the pipeline DAG nodes to expand or collapse them. Here's a partially-expanded view of the DAG (click image to see larger version).

<a href="https://storage.googleapis.com/amy-jo/images/mp/control_flow_dag.png" target="_blank"><img src="https://storage.googleapis.com/amy-jo/images/mp/control_flow_dag.png" width="95%"/></a>

You can see, for example, that the 'heads' condition passed, and the 'tails' condition failed as expected.

# Cleaning up

To clean up all Google Cloud resources used in this project, you can [delete the Google Cloud
project](https://cloud.google.com/resource-manager/docs/creating-managing-projects#shutting_down_projects) you used for the tutorial.

Otherwise, you can delete the individual resources you created in this tutorial.

In [None]:
delete_bucket = False

# Delete the pipeline job
job.delete()

# Delete the locally generated files
! rm control_pipeline.yaml

if delete_bucket:
    ! gsutil rm -r $BUCKET_URI