notebooks/generic_notebook/vertex_pipeline_pyspark.ipynb (554 lines of code) (raw):
{
"cells": [
{
"cell_type": "code",
"execution_count": null,
"id": "d2c62b00-03d1-4d9e-beb2-f9961452338a",
"metadata": {
"pycharm": {
"name": "#%%\n"
}
},
"outputs": [],
"source": [
"# Copyright 2022 Google LLC\n",
"#\n",
"# Licensed under the Apache License, Version 2.0 (the \"License\");\n",
"# you may not use this file except in compliance with the License.\n",
"# You may obtain a copy of the License at\n",
"#\n",
"# https://www.apache.org/licenses/LICENSE-2.0\n",
"#\n",
"# Unless required by applicable law or agreed to in writing, software\n",
"# distributed under the License is distributed on an \"AS IS\" BASIS,\n",
"# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\n",
"# See the License for the specific language governing permissions and\n",
"# limitations under the License."
]
},
{
"cell_type": "markdown",
"id": "6d7696ef-61d7-4c79-9eb2-281079b3802e",
"metadata": {
"pycharm": {
"name": "#%% md\n"
}
},
"source": [
"# Run Dataproc Templates from Vertex AI Pipelines\n",
"<table align=\"left\">\n",
"<td>\n",
" <a href=\"https://colab.research.google.com/github/GoogleCloudPlatform/dataproc-templates/blob/main/notebooks/generic_notebook/vertex_pipeline_pyspark.ipynb\">\n",
" <img src=\"../images/colab-logo-32px.png\" alt=\"Colab logo\" />Run in Colab\n",
" </a>\n",
"</td>\n",
"<td>\n",
" <a href=\"https://console.cloud.google.com/vertex-ai/colab/import/https:%2F%2Fraw.githubusercontent.com%2FGoogleCloudPlatform%2Fdataproc-templates%2Fmain%2Fnotebooks%2Fgeneric_notebook%2Fvertex_pipeline_pyspark.ipynb\">\n",
" <img src=\"../images/colab-enterprise-logo-32px.png\" alt=\"GCP Colab Enterprise logo\" />Run in Colab Enterprise\n",
" </a>\n",
"</td>\n",
"<td>\n",
" <a href=\"https://github.com/GoogleCloudPlatform/dataproc-templates/blob/main/notebooks/generic_notebook/vertex_pipeline_pyspark.ipynb\">\n",
" <img src=\"../images/github-logo-32px.png\" alt=\"GitHub logo\" />View on GitHub\n",
" </a>\n",
"</td>\n",
"<td>\n",
" <a href=\"https://console.cloud.google.com/vertex-ai/workbench/deploy-notebook?download_url=https://raw.githubusercontent.com/GoogleCloudPlatform/dataproc-templates/main/notebooks/generic_notebook/vertex_pipeline_pyspark.ipynb\">\n",
" <img src=\"../images/vertexai.jpg\" alt=\"Vertex AI logo\" />Open in Vertex AI Workbench\n",
" </a>\n",
"</td>\n",
"</table>"
]
},
{
"cell_type": "markdown",
"id": "4fa95de2",
"metadata": {},
"source": [
"## Overview\n",
"\n",
"This notebook shows how to build a Vertex AI Pipeline to run a Dataproc Template using the DataprocPySparkBatchOp component."
]
},
{
"cell_type": "markdown",
"id": "bf94dbe6-3ea2-4e26-90db-0da6c5a008ab",
"metadata": {
"pycharm": {
"name": "#%% md\n"
}
},
"source": [
"#### References\n",
"\n",
"- [DataprocPySparkBatchOp reference](https://google-cloud-pipeline-components.readthedocs.io/en/google-cloud-pipeline-components-1.0.0/google_cloud_pipeline_components.experimental.dataproc.html)\n",
"- [Kubeflow SDK Overview](https://www.kubeflow.org/docs/components/pipelines/sdk/sdk-overview/)\n",
"- [Dataproc Serverless in Vertex AI Pipelines tutorial](https://github.com/GoogleCloudPlatform/vertex-ai-samples/blob/main/notebooks/community/ml_ops/stage3/get_started_with_dataproc_serverless_pipeline_components.ipynb)\n",
"- [Build a Vertex AI Pipeline](https://cloud.google.com/vertex-ai/docs/pipelines/build-pipeline)\n",
"\n",
"This notebook is built to run a Vertex AI User-Managed Notebook using the default Compute Engine Service Account. \n",
"Check the Dataproc Serverless in Vertex AI Pipelines tutorial linked above to learn how to setup a different Service Account. "
]
},
{
"cell_type": "markdown",
"id": "3cd82da5-c46e-4067-b810-93ede117cdda",
"metadata": {
"pycharm": {
"name": "#%% md\n"
}
},
"source": [
"#### Permissions\n",
"\n",
"Make sure that the service account used to run the notebook has the following roles:\n",
"\n",
"- roles/aiplatform.serviceAgent\n",
"- roles/aiplatform.customCodeServiceAgent\n",
"- roles/storage.objectCreator\n",
"- roles/storage.objectViewer\n",
"- roles/dataproc.editor\n",
"- roles/dataproc.worker"
]
},
{
"cell_type": "markdown",
"id": "6a2de52d-7099-4f03-becc-9886fa82b130",
"metadata": {
"pycharm": {
"name": "#%% md\n"
}
},
"source": [
"#### Install the required packages"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "b3742b9a-143d-49fc-b43c-e56179c7f0f2",
"metadata": {
"pycharm": {
"name": "#%%\n"
}
},
"outputs": [],
"source": [
"import os\n",
"\n",
"# Google Cloud notebooks requires dependencies to be installed with '--user'\n",
"! pip3 install --upgrade google-cloud-pipeline-components kfp --user -q"
]
},
{
"cell_type": "markdown",
"id": "ed5cc9ee-1137-4fb9-9a7c-a49cd8bc1ae1",
"metadata": {
"pycharm": {
"name": "#%% md\n"
}
},
"source": [
"Once you've installed the additional packages, you need to restart the notebook kernel so it can find the packages."
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "01c19b5e-e7d9-416f-ad12-edc19b6877e6",
"metadata": {
"pycharm": {
"name": "#%%\n"
}
},
"outputs": [],
"source": [
"import os\n",
"\n",
"if not os.getenv(\"IS_TESTING\"):\n",
" import IPython\n",
"\n",
" app = IPython.Application.instance()\n",
" app.kernel.do_shutdown(True)"
]
},
{
"cell_type": "markdown",
"id": "4e1af72a-1fcc-4495-b710-58eb950a1d45",
"metadata": {
"pycharm": {
"name": "#%% md\n"
}
},
"source": [
"#### Import dependencies"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "c7aede36-3a0a-43f7-8e4c-db2d8087e289",
"metadata": {
"pycharm": {
"name": "#%%\n"
}
},
"outputs": [],
"source": [
"import os\n",
"import google.cloud.aiplatform as aiplatform\n",
"\n",
"try:\n",
" from google_cloud_pipeline_components.experimental.dataproc import DataprocPySparkBatchOp\n",
"except ModuleNotFoundError:\n",
" from google_cloud_pipeline_components.v1.dataproc import DataprocPySparkBatchOp\n",
" \n",
"from kfp import dsl\n",
"from kfp import compiler\n",
"from datetime import datetime\n",
"from pathlib import Path"
]
},
{
"cell_type": "markdown",
"id": "05c7aa35-f539-4f6a-a243-3e0799c47499",
"metadata": {
"pycharm": {
"name": "#%% md\n"
}
},
"source": [
"#### Change working directory to the Dataproc Templates python folder"
]
},
{
"cell_type": "markdown",
"id": "a46140a7",
"metadata": {},
"source": [
"Uncomment & Run this cell if you are running from Colab"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "4a34e63a",
"metadata": {},
"outputs": [],
"source": [
"# !git clone https://github.com/GoogleCloudPlatform/dataproc-templates.git\n",
"# !mv /content/dataproc-templates/notebooks/util /content/\n",
"# !mv /content/dataproc-templates/python/ /content/"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "d6e0e80a-e8b7-4e54-9f99-b7874e164978",
"metadata": {
"pycharm": {
"name": "#%%\n"
}
},
"outputs": [],
"source": [
"cur_path = Path(os.getcwd())\n",
"WORKING_DIRECTORY = os.path.join(cur_path.parent.parent ,'python')\n",
"\n",
"# If the above code doesn't fetches the correct path please\n",
"# provide complete path to python folder in your dataproc \n",
"# template repo which you cloned \n",
"\n",
"# WORKING_DIRECTORY = \"/home/jupyter/dataproc-templates/python/\"\n",
"print(WORKING_DIRECTORY)"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "69752c22",
"metadata": {
"pycharm": {
"name": "#%%\n"
}
},
"outputs": [],
"source": [
"%cd $WORKING_DIRECTORY"
]
},
{
"cell_type": "markdown",
"id": "e7dc50fd-c262-4308-804e-298e62af14b4",
"metadata": {
"pycharm": {
"name": "#%% md\n"
}
},
"source": [
"#### Set Google Cloud properties"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "f010bef0-2d9f-430a-b89c-1875b1647c02",
"metadata": {
"pycharm": {
"name": "#%%\n"
}
},
"outputs": [],
"source": [
"get_project_id = ! gcloud config list --format 'value(core.project)' 2>/dev/null\n",
"\n",
"PROJECT_ID = get_project_id[0]\n",
"REGION = \"<region>\"\n",
"GCS_STAGING_LOCATION = \"<gs://bucket>\"\n",
"SUBNET = \"projects/<project>/regions/<region>/subnetworks/<subnet>\"\n",
"DATAPROC_SERVICE_ACCOUNT = \"\" #eg. test@project_id.iam.gserviceaccount.com"
]
},
{
"cell_type": "markdown",
"id": "0ac30fae-1583-462e-a7a9-61f2736dbc5a",
"metadata": {
"pycharm": {
"name": "#%% md\n"
}
},
"source": [
"#### Build Dataproc Templates python package"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "cfaa8c93-5e69-48fc-984a-f4fa3b28519b",
"metadata": {
"pycharm": {
"name": "#%%\n"
}
},
"outputs": [],
"source": [
"PACKAGE_EGG_FILE = \"dist/dataproc_templates_distribution.egg\"\n",
"! python ./setup.py bdist_egg --output=$PACKAGE_EGG_FILE"
]
},
{
"attachments": {},
"cell_type": "markdown",
"id": "fe0f7ad9-7d01-4496-b08c-f9e69775d0c1",
"metadata": {
"pycharm": {
"name": "#%% md\n"
}
},
"source": [
"#### Copy package to the Cloud Storage bucket\n",
"\n",
"For this, make sure that the service account used to run the notebook has the following roles:\n",
" - roles/storage.objectCreator\n",
" - roles/storage.objectViewer"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "261867cf-45fc-4940-8db8-e079edcf9cb3",
"metadata": {
"pycharm": {
"name": "#%%\n"
}
},
"outputs": [],
"source": [
"! gsutil cp main.py $GCS_STAGING_LOCATION/\n",
"! gsutil cp -r $PACKAGE_EGG_FILE $GCS_STAGING_LOCATION/dist/"
]
},
{
"cell_type": "markdown",
"id": "07c6d386-e37e-4f7e-8376-f0f0ee6861a9",
"metadata": {
"pycharm": {
"name": "#%% md\n"
}
},
"source": [
"#### Set Dataproc Templates properties"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "63bc60ad-a0ab-4a0d-83ae-4f7af8a86498",
"metadata": {
"pycharm": {
"name": "#%%\n"
}
},
"outputs": [],
"source": [
"PIPELINE_ROOT = GCS_STAGING_LOCATION + \"/pipeline_root/dataproc_pyspark\"\n",
"MAIN_PYTHON_FILE = GCS_STAGING_LOCATION + \"/main.py\"\n",
"PYTHON_FILE_URIS = [GCS_STAGING_LOCATION + \"/dist/dataproc_templates_distribution.egg\"]\n",
"JARS = [\"gs://spark-lib/bigquery/spark-bigquery-latest_2.12.jar\"]\n",
"BATCH_ID = \"dataproc-templates-\" + datetime.now().strftime(\"%Y%m%d%H%M%S\")"
]
},
{
"cell_type": "markdown",
"id": "5d33b841-0538-491f-b686-38ab4a01ad1e",
"metadata": {
"pycharm": {
"name": "#%% md\n"
}
},
"source": [
"#### Choose template and set template arguments"
]
},
{
"cell_type": "markdown",
"id": "fc6dcb79-3a71-4cd5-a67e-1783d2456545",
"metadata": {
"pycharm": {
"name": "#%% md\n"
}
},
"source": [
"GCSTOBIGQUERY is chosen in this notebook as an example. \n",
"Check the arguments in the template's documentation. "
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "18574192-8d4d-43c8-98e5-f3d613c98fab",
"metadata": {
"pycharm": {
"name": "#%%\n"
}
},
"outputs": [],
"source": [
"TEMPLATE_SPARK_ARGS = [\n",
"\"--template=GCSTOBIGQUERY\",\n",
"\"--gcs.bigquery.input.format=<format>\",\n",
"\"--gcs.bigquery.input.location=<gs://bucket/path>\",\n",
"\"--gcs.bigquery.output.dataset=<dataset>\",\n",
"\"--gcs.bigquery.output.table=<table>\",\n",
"\"--gcs.bigquery.temp.bucket.name=<bucket>\"\n",
"]"
]
},
{
"cell_type": "markdown",
"id": "9896c320-f73e-40e8-9aab-cb519de241f3",
"metadata": {
"pycharm": {
"name": "#%% md\n"
}
},
"source": [
"### Build pipeline and run Dataproc Template on Vertex AI Pipelines\n",
"\n",
"For this, make sure that the service account used to run the notebook has the following roles:\n",
" - roles/dataproc.editor\n",
" - roles/dataproc.worker"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "cfb58c14-c691-4e7f-982c-fb74f4820b59",
"metadata": {
"pycharm": {
"name": "#%%\n"
}
},
"outputs": [],
"source": [
"aiplatform.init(project=PROJECT_ID, staging_bucket=GCS_STAGING_LOCATION)\n",
"\n",
"@dsl.pipeline(\n",
" name=\"dataproc-templates-pyspark\",\n",
" description=\"An example pipeline that uses DataprocPySparkBatchOp to run a PySpark Dataproc Template batch workload\",\n",
")\n",
"def pipeline(\n",
" batch_id: str = BATCH_ID,\n",
" project_id: str = PROJECT_ID,\n",
" location: str = REGION,\n",
" main_python_file_uri: str = MAIN_PYTHON_FILE,\n",
" python_file_uris: list = PYTHON_FILE_URIS,\n",
" jar_file_uris: list = JARS,\n",
" subnetwork_uri: str = SUBNET,\n",
" service_account: str = DATAPROC_SERVICE_ACCOUNT,\n",
" args: list = TEMPLATE_SPARK_ARGS,\n",
"):\n",
"\n",
" _ = DataprocPySparkBatchOp(\n",
" project=project_id,\n",
" location=location,\n",
" batch_id=batch_id,\n",
" main_python_file_uri=main_python_file_uri,\n",
" python_file_uris=python_file_uris,\n",
" jar_file_uris=jar_file_uris,\n",
" subnetwork_uri=subnetwork_uri,\n",
" service_account=service_account,\n",
" runtime_config_version=\"1.1\", # issue 665\n",
" args=args,\n",
" )\n",
"\n",
"compiler.Compiler().compile(pipeline_func=pipeline, package_path=\"pipeline.json\")\n",
"\n",
"pipeline = aiplatform.PipelineJob(\n",
" display_name=\"pipeline\",\n",
" template_path=\"pipeline.json\",\n",
" pipeline_root=PIPELINE_ROOT,\n",
" location=REGION,\n",
" enable_caching=False,\n",
")\n",
"\n",
"# run() method has an optional parameter `service_account` which you can pass if you want to run pipeline using\n",
"# specific service account instead of default service account \n",
"# eg. pipeline.run(service_account='test@project_id.iam.gserviceaccount.com')\n",
"pipeline.run()"
]
}
],
"metadata": {
"environment": {
"kernel": "python3",
"name": "common-cpu.m91",
"type": "gcloud",
"uri": "gcr.io/deeplearning-platform-release/base-cpu:m91"
},
"kernelspec": {
"display_name": "Python 3 (ipykernel) (Local)",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.9.9"
},
"vscode": {
"interpreter": {
"hash": "31f2aee4e71d21fbe5cf8b01ff0e069b9275f58929596ceb00d14d90e3e16cd6"
}
}
},
"nbformat": 4,
"nbformat_minor": 5
}