In [None]:
# Copyright 2022 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

# Run Dataproc Templates from Vertex AI Pipelines
<table align="left">
<td>
    <a href="https://colab.research.google.com/github/GoogleCloudPlatform/dataproc-templates/blob/main/notebooks/generic_notebook/vertex_pipeline_pyspark.ipynb">
        <img src="../images/colab-logo-32px.png" alt="Colab logo" />Run in Colab
    </a>
</td>
<td>
    <a href="https://console.cloud.google.com/vertex-ai/colab/import/https:%2F%2Fraw.githubusercontent.com%2FGoogleCloudPlatform%2Fdataproc-templates%2Fmain%2Fnotebooks%2Fgeneric_notebook%2Fvertex_pipeline_pyspark.ipynb">
        <img src="../images/colab-enterprise-logo-32px.png" alt="GCP Colab Enterprise logo" />Run in Colab Enterprise
    </a>
</td>
<td>
    <a href="https://github.com/GoogleCloudPlatform/dataproc-templates/blob/main/notebooks/generic_notebook/vertex_pipeline_pyspark.ipynb">
        <img src="../images/github-logo-32px.png" alt="GitHub logo" />View on GitHub
    </a>
</td>
<td>
    <a href="https://console.cloud.google.com/vertex-ai/workbench/deploy-notebook?download_url=https://raw.githubusercontent.com/GoogleCloudPlatform/dataproc-templates/main/notebooks/generic_notebook/vertex_pipeline_pyspark.ipynb">
        <img src="../images/vertexai.jpg" alt="Vertex AI logo" />Open in Vertex AI Workbench
    </a>
</td>
</table>

## Overview

This notebook shows how to build a Vertex AI Pipeline to run a Dataproc Template using the DataprocPySparkBatchOp component.

#### References

- [DataprocPySparkBatchOp reference](https://google-cloud-pipeline-components.readthedocs.io/en/google-cloud-pipeline-components-1.0.0/google_cloud_pipeline_components.experimental.dataproc.html)
- [Kubeflow SDK Overview](https://www.kubeflow.org/docs/components/pipelines/sdk/sdk-overview/)
- [Dataproc Serverless in Vertex AI Pipelines tutorial](https://github.com/GoogleCloudPlatform/vertex-ai-samples/blob/main/notebooks/community/ml_ops/stage3/get_started_with_dataproc_serverless_pipeline_components.ipynb)
- [Build a Vertex AI Pipeline](https://cloud.google.com/vertex-ai/docs/pipelines/build-pipeline)

This notebook is built to run a Vertex AI User-Managed Notebook using the default Compute Engine Service Account.  
Check the Dataproc Serverless in Vertex AI Pipelines tutorial linked above to learn how to setup a different Service Account.  

#### Permissions

Make sure that the service account used to run the notebook has the following roles:

- roles/aiplatform.serviceAgent
- roles/aiplatform.customCodeServiceAgent
- roles/storage.objectCreator
- roles/storage.objectViewer
- roles/dataproc.editor
- roles/dataproc.worker

#### Install the required packages

In [None]:
import os

# Google Cloud notebooks requires dependencies to be installed with '--user'
! pip3 install --upgrade google-cloud-pipeline-components kfp --user -q

Once you've installed the additional packages, you need to restart the notebook kernel so it can find the packages.

In [None]:
import os

if not os.getenv("IS_TESTING"):
    import IPython

    app = IPython.Application.instance()
    app.kernel.do_shutdown(True)

#### Import dependencies

In [None]:
import os
import google.cloud.aiplatform as aiplatform

try:
    from google_cloud_pipeline_components.experimental.dataproc import DataprocPySparkBatchOp
except ModuleNotFoundError:
    from google_cloud_pipeline_components.v1.dataproc import DataprocPySparkBatchOp
    
from kfp import dsl
from kfp import compiler
from datetime import datetime
from pathlib import Path

#### Change working directory to the Dataproc Templates python folder

Uncomment & Run this cell if you are running from Colab

In [None]:
# !git clone https://github.com/GoogleCloudPlatform/dataproc-templates.git
# !mv /content/dataproc-templates/notebooks/util /content/
# !mv /content/dataproc-templates/python/ /content/

In [None]:
cur_path = Path(os.getcwd())
WORKING_DIRECTORY = os.path.join(cur_path.parent.parent ,'python')

# If the above code doesn't fetches the correct path please
# provide complete path to python folder in your dataproc 
# template repo which you cloned 

# WORKING_DIRECTORY = "/home/jupyter/dataproc-templates/python/"
print(WORKING_DIRECTORY)

In [None]:
%cd $WORKING_DIRECTORY

#### Set Google Cloud properties

In [None]:
get_project_id = ! gcloud config list --format 'value(core.project)' 2>/dev/null

PROJECT_ID = get_project_id[0]
REGION = "<region>"
GCS_STAGING_LOCATION = "<gs://bucket>"
SUBNET = "projects/<project>/regions/<region>/subnetworks/<subnet>"
DATAPROC_SERVICE_ACCOUNT = "" #eg. test@project_id.iam.gserviceaccount.com

#### Build Dataproc Templates python package

In [None]:
PACKAGE_EGG_FILE = "dist/dataproc_templates_distribution.egg"
! python ./setup.py bdist_egg --output=$PACKAGE_EGG_FILE

#### Copy package to the Cloud Storage bucket

For this, make sure that the service account used to run the notebook has the following roles:
 - roles/storage.objectCreator
 - roles/storage.objectViewer

In [None]:
! gsutil cp main.py $GCS_STAGING_LOCATION/
! gsutil cp -r $PACKAGE_EGG_FILE $GCS_STAGING_LOCATION/dist/

#### Set Dataproc Templates properties

In [None]:
PIPELINE_ROOT = GCS_STAGING_LOCATION + "/pipeline_root/dataproc_pyspark"
MAIN_PYTHON_FILE = GCS_STAGING_LOCATION + "/main.py"
PYTHON_FILE_URIS = [GCS_STAGING_LOCATION + "/dist/dataproc_templates_distribution.egg"]
JARS = ["gs://spark-lib/bigquery/spark-bigquery-latest_2.12.jar"]
BATCH_ID = "dataproc-templates-" + datetime.now().strftime("%Y%m%d%H%M%S")

#### Choose template and set template arguments

GCSTOBIGQUERY is chosen in this notebook as an example.  
Check the arguments in the template's documentation.  

In [None]:
TEMPLATE_SPARK_ARGS = [
"--template=GCSTOBIGQUERY",
"--gcs.bigquery.input.format=<format>",
"--gcs.bigquery.input.location=<gs://bucket/path>",
"--gcs.bigquery.output.dataset=<dataset>",
"--gcs.bigquery.output.table=<table>",
"--gcs.bigquery.temp.bucket.name=<bucket>"
]

### Build pipeline and run Dataproc Template on Vertex AI Pipelines

For this, make sure that the service account used to run the notebook has the following roles:
 - roles/dataproc.editor
 - roles/dataproc.worker

In [None]:
aiplatform.init(project=PROJECT_ID, staging_bucket=GCS_STAGING_LOCATION)

@dsl.pipeline(
    name="dataproc-templates-pyspark",
    description="An example pipeline that uses DataprocPySparkBatchOp to run a PySpark Dataproc Template batch workload",
)
def pipeline(
    batch_id: str = BATCH_ID,
    project_id: str = PROJECT_ID,
    location: str = REGION,
    main_python_file_uri: str = MAIN_PYTHON_FILE,
    python_file_uris: list = PYTHON_FILE_URIS,
    jar_file_uris: list = JARS,
    subnetwork_uri: str = SUBNET,
    service_account: str = DATAPROC_SERVICE_ACCOUNT,
    args: list = TEMPLATE_SPARK_ARGS,
):

    _ = DataprocPySparkBatchOp(
        project=project_id,
        location=location,
        batch_id=batch_id,
        main_python_file_uri=main_python_file_uri,
        python_file_uris=python_file_uris,
        jar_file_uris=jar_file_uris,
        subnetwork_uri=subnetwork_uri,
        service_account=service_account,
        runtime_config_version="1.1", # issue 665
        args=args,
    )

compiler.Compiler().compile(pipeline_func=pipeline, package_path="pipeline.json")

pipeline = aiplatform.PipelineJob(
    display_name="pipeline",
    template_path="pipeline.json",
    pipeline_root=PIPELINE_ROOT,
    location=REGION,
    enable_caching=False,
)

# run() method has an optional parameter `service_account` which you can pass if you want to run pipeline using
# specific service account instead of default service account 
# eg. pipeline.run(service_account='test@project_id.iam.gserviceaccount.com')
pipeline.run()