notebooks/generic_notebook/vertex_pipeline_pyspark.ipynb (554 lines of code) (raw):

{ "cells": [ { "cell_type": "code", "execution_count": null, "id": "d2c62b00-03d1-4d9e-beb2-f9961452338a", "metadata": { "pycharm": { "name": "#%%\n" } }, "outputs": [], "source": [ "# Copyright 2022 Google LLC\n", "#\n", "# Licensed under the Apache License, Version 2.0 (the \"License\");\n", "# you may not use this file except in compliance with the License.\n", "# You may obtain a copy of the License at\n", "#\n", "# https://www.apache.org/licenses/LICENSE-2.0\n", "#\n", "# Unless required by applicable law or agreed to in writing, software\n", "# distributed under the License is distributed on an \"AS IS\" BASIS,\n", "# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\n", "# See the License for the specific language governing permissions and\n", "# limitations under the License." ] }, { "cell_type": "markdown", "id": "6d7696ef-61d7-4c79-9eb2-281079b3802e", "metadata": { "pycharm": { "name": "#%% md\n" } }, "source": [ "# Run Dataproc Templates from Vertex AI Pipelines\n", "<table align=\"left\">\n", "<td>\n", " <a href=\"https://colab.research.google.com/github/GoogleCloudPlatform/dataproc-templates/blob/main/notebooks/generic_notebook/vertex_pipeline_pyspark.ipynb\">\n", " <img src=\"../images/colab-logo-32px.png\" alt=\"Colab logo\" />Run in Colab\n", " </a>\n", "</td>\n", "<td>\n", " <a href=\"https://console.cloud.google.com/vertex-ai/colab/import/https:%2F%2Fraw.githubusercontent.com%2FGoogleCloudPlatform%2Fdataproc-templates%2Fmain%2Fnotebooks%2Fgeneric_notebook%2Fvertex_pipeline_pyspark.ipynb\">\n", " <img src=\"../images/colab-enterprise-logo-32px.png\" alt=\"GCP Colab Enterprise logo\" />Run in Colab Enterprise\n", " </a>\n", "</td>\n", "<td>\n", " <a href=\"https://github.com/GoogleCloudPlatform/dataproc-templates/blob/main/notebooks/generic_notebook/vertex_pipeline_pyspark.ipynb\">\n", " <img src=\"../images/github-logo-32px.png\" alt=\"GitHub logo\" />View on GitHub\n", " </a>\n", "</td>\n", "<td>\n", " <a href=\"https://console.cloud.google.com/vertex-ai/workbench/deploy-notebook?download_url=https://raw.githubusercontent.com/GoogleCloudPlatform/dataproc-templates/main/notebooks/generic_notebook/vertex_pipeline_pyspark.ipynb\">\n", " <img src=\"../images/vertexai.jpg\" alt=\"Vertex AI logo\" />Open in Vertex AI Workbench\n", " </a>\n", "</td>\n", "</table>" ] }, { "cell_type": "markdown", "id": "4fa95de2", "metadata": {}, "source": [ "## Overview\n", "\n", "This notebook shows how to build a Vertex AI Pipeline to run a Dataproc Template using the DataprocPySparkBatchOp component." ] }, { "cell_type": "markdown", "id": "bf94dbe6-3ea2-4e26-90db-0da6c5a008ab", "metadata": { "pycharm": { "name": "#%% md\n" } }, "source": [ "#### References\n", "\n", "- [DataprocPySparkBatchOp reference](https://google-cloud-pipeline-components.readthedocs.io/en/google-cloud-pipeline-components-1.0.0/google_cloud_pipeline_components.experimental.dataproc.html)\n", "- [Kubeflow SDK Overview](https://www.kubeflow.org/docs/components/pipelines/sdk/sdk-overview/)\n", "- [Dataproc Serverless in Vertex AI Pipelines tutorial](https://github.com/GoogleCloudPlatform/vertex-ai-samples/blob/main/notebooks/community/ml_ops/stage3/get_started_with_dataproc_serverless_pipeline_components.ipynb)\n", "- [Build a Vertex AI Pipeline](https://cloud.google.com/vertex-ai/docs/pipelines/build-pipeline)\n", "\n", "This notebook is built to run a Vertex AI User-Managed Notebook using the default Compute Engine Service Account. \n", "Check the Dataproc Serverless in Vertex AI Pipelines tutorial linked above to learn how to setup a different Service Account. " ] }, { "cell_type": "markdown", "id": "3cd82da5-c46e-4067-b810-93ede117cdda", "metadata": { "pycharm": { "name": "#%% md\n" } }, "source": [ "#### Permissions\n", "\n", "Make sure that the service account used to run the notebook has the following roles:\n", "\n", "- roles/aiplatform.serviceAgent\n", "- roles/aiplatform.customCodeServiceAgent\n", "- roles/storage.objectCreator\n", "- roles/storage.objectViewer\n", "- roles/dataproc.editor\n", "- roles/dataproc.worker" ] }, { "cell_type": "markdown", "id": "6a2de52d-7099-4f03-becc-9886fa82b130", "metadata": { "pycharm": { "name": "#%% md\n" } }, "source": [ "#### Install the required packages" ] }, { "cell_type": "code", "execution_count": null, "id": "b3742b9a-143d-49fc-b43c-e56179c7f0f2", "metadata": { "pycharm": { "name": "#%%\n" } }, "outputs": [], "source": [ "import os\n", "\n", "# Google Cloud notebooks requires dependencies to be installed with '--user'\n", "! pip3 install --upgrade google-cloud-pipeline-components kfp --user -q" ] }, { "cell_type": "markdown", "id": "ed5cc9ee-1137-4fb9-9a7c-a49cd8bc1ae1", "metadata": { "pycharm": { "name": "#%% md\n" } }, "source": [ "Once you've installed the additional packages, you need to restart the notebook kernel so it can find the packages." ] }, { "cell_type": "code", "execution_count": null, "id": "01c19b5e-e7d9-416f-ad12-edc19b6877e6", "metadata": { "pycharm": { "name": "#%%\n" } }, "outputs": [], "source": [ "import os\n", "\n", "if not os.getenv(\"IS_TESTING\"):\n", " import IPython\n", "\n", " app = IPython.Application.instance()\n", " app.kernel.do_shutdown(True)" ] }, { "cell_type": "markdown", "id": "4e1af72a-1fcc-4495-b710-58eb950a1d45", "metadata": { "pycharm": { "name": "#%% md\n" } }, "source": [ "#### Import dependencies" ] }, { "cell_type": "code", "execution_count": null, "id": "c7aede36-3a0a-43f7-8e4c-db2d8087e289", "metadata": { "pycharm": { "name": "#%%\n" } }, "outputs": [], "source": [ "import os\n", "import google.cloud.aiplatform as aiplatform\n", "\n", "try:\n", " from google_cloud_pipeline_components.experimental.dataproc import DataprocPySparkBatchOp\n", "except ModuleNotFoundError:\n", " from google_cloud_pipeline_components.v1.dataproc import DataprocPySparkBatchOp\n", " \n", "from kfp import dsl\n", "from kfp import compiler\n", "from datetime import datetime\n", "from pathlib import Path" ] }, { "cell_type": "markdown", "id": "05c7aa35-f539-4f6a-a243-3e0799c47499", "metadata": { "pycharm": { "name": "#%% md\n" } }, "source": [ "#### Change working directory to the Dataproc Templates python folder" ] }, { "cell_type": "markdown", "id": "a46140a7", "metadata": {}, "source": [ "Uncomment & Run this cell if you are running from Colab" ] }, { "cell_type": "code", "execution_count": null, "id": "4a34e63a", "metadata": {}, "outputs": [], "source": [ "# !git clone https://github.com/GoogleCloudPlatform/dataproc-templates.git\n", "# !mv /content/dataproc-templates/notebooks/util /content/\n", "# !mv /content/dataproc-templates/python/ /content/" ] }, { "cell_type": "code", "execution_count": null, "id": "d6e0e80a-e8b7-4e54-9f99-b7874e164978", "metadata": { "pycharm": { "name": "#%%\n" } }, "outputs": [], "source": [ "cur_path = Path(os.getcwd())\n", "WORKING_DIRECTORY = os.path.join(cur_path.parent.parent ,'python')\n", "\n", "# If the above code doesn't fetches the correct path please\n", "# provide complete path to python folder in your dataproc \n", "# template repo which you cloned \n", "\n", "# WORKING_DIRECTORY = \"/home/jupyter/dataproc-templates/python/\"\n", "print(WORKING_DIRECTORY)" ] }, { "cell_type": "code", "execution_count": null, "id": "69752c22", "metadata": { "pycharm": { "name": "#%%\n" } }, "outputs": [], "source": [ "%cd $WORKING_DIRECTORY" ] }, { "cell_type": "markdown", "id": "e7dc50fd-c262-4308-804e-298e62af14b4", "metadata": { "pycharm": { "name": "#%% md\n" } }, "source": [ "#### Set Google Cloud properties" ] }, { "cell_type": "code", "execution_count": null, "id": "f010bef0-2d9f-430a-b89c-1875b1647c02", "metadata": { "pycharm": { "name": "#%%\n" } }, "outputs": [], "source": [ "get_project_id = ! gcloud config list --format 'value(core.project)' 2>/dev/null\n", "\n", "PROJECT_ID = get_project_id[0]\n", "REGION = \"<region>\"\n", "GCS_STAGING_LOCATION = \"<gs://bucket>\"\n", "SUBNET = \"projects/<project>/regions/<region>/subnetworks/<subnet>\"\n", "DATAPROC_SERVICE_ACCOUNT = \"\" #eg. test@project_id.iam.gserviceaccount.com" ] }, { "cell_type": "markdown", "id": "0ac30fae-1583-462e-a7a9-61f2736dbc5a", "metadata": { "pycharm": { "name": "#%% md\n" } }, "source": [ "#### Build Dataproc Templates python package" ] }, { "cell_type": "code", "execution_count": null, "id": "cfaa8c93-5e69-48fc-984a-f4fa3b28519b", "metadata": { "pycharm": { "name": "#%%\n" } }, "outputs": [], "source": [ "PACKAGE_EGG_FILE = \"dist/dataproc_templates_distribution.egg\"\n", "! python ./setup.py bdist_egg --output=$PACKAGE_EGG_FILE" ] }, { "attachments": {}, "cell_type": "markdown", "id": "fe0f7ad9-7d01-4496-b08c-f9e69775d0c1", "metadata": { "pycharm": { "name": "#%% md\n" } }, "source": [ "#### Copy package to the Cloud Storage bucket\n", "\n", "For this, make sure that the service account used to run the notebook has the following roles:\n", " - roles/storage.objectCreator\n", " - roles/storage.objectViewer" ] }, { "cell_type": "code", "execution_count": null, "id": "261867cf-45fc-4940-8db8-e079edcf9cb3", "metadata": { "pycharm": { "name": "#%%\n" } }, "outputs": [], "source": [ "! gsutil cp main.py $GCS_STAGING_LOCATION/\n", "! gsutil cp -r $PACKAGE_EGG_FILE $GCS_STAGING_LOCATION/dist/" ] }, { "cell_type": "markdown", "id": "07c6d386-e37e-4f7e-8376-f0f0ee6861a9", "metadata": { "pycharm": { "name": "#%% md\n" } }, "source": [ "#### Set Dataproc Templates properties" ] }, { "cell_type": "code", "execution_count": null, "id": "63bc60ad-a0ab-4a0d-83ae-4f7af8a86498", "metadata": { "pycharm": { "name": "#%%\n" } }, "outputs": [], "source": [ "PIPELINE_ROOT = GCS_STAGING_LOCATION + \"/pipeline_root/dataproc_pyspark\"\n", "MAIN_PYTHON_FILE = GCS_STAGING_LOCATION + \"/main.py\"\n", "PYTHON_FILE_URIS = [GCS_STAGING_LOCATION + \"/dist/dataproc_templates_distribution.egg\"]\n", "JARS = [\"gs://spark-lib/bigquery/spark-bigquery-latest_2.12.jar\"]\n", "BATCH_ID = \"dataproc-templates-\" + datetime.now().strftime(\"%Y%m%d%H%M%S\")" ] }, { "cell_type": "markdown", "id": "5d33b841-0538-491f-b686-38ab4a01ad1e", "metadata": { "pycharm": { "name": "#%% md\n" } }, "source": [ "#### Choose template and set template arguments" ] }, { "cell_type": "markdown", "id": "fc6dcb79-3a71-4cd5-a67e-1783d2456545", "metadata": { "pycharm": { "name": "#%% md\n" } }, "source": [ "GCSTOBIGQUERY is chosen in this notebook as an example. \n", "Check the arguments in the template's documentation. " ] }, { "cell_type": "code", "execution_count": null, "id": "18574192-8d4d-43c8-98e5-f3d613c98fab", "metadata": { "pycharm": { "name": "#%%\n" } }, "outputs": [], "source": [ "TEMPLATE_SPARK_ARGS = [\n", "\"--template=GCSTOBIGQUERY\",\n", "\"--gcs.bigquery.input.format=<format>\",\n", "\"--gcs.bigquery.input.location=<gs://bucket/path>\",\n", "\"--gcs.bigquery.output.dataset=<dataset>\",\n", "\"--gcs.bigquery.output.table=<table>\",\n", "\"--gcs.bigquery.temp.bucket.name=<bucket>\"\n", "]" ] }, { "cell_type": "markdown", "id": "9896c320-f73e-40e8-9aab-cb519de241f3", "metadata": { "pycharm": { "name": "#%% md\n" } }, "source": [ "### Build pipeline and run Dataproc Template on Vertex AI Pipelines\n", "\n", "For this, make sure that the service account used to run the notebook has the following roles:\n", " - roles/dataproc.editor\n", " - roles/dataproc.worker" ] }, { "cell_type": "code", "execution_count": null, "id": "cfb58c14-c691-4e7f-982c-fb74f4820b59", "metadata": { "pycharm": { "name": "#%%\n" } }, "outputs": [], "source": [ "aiplatform.init(project=PROJECT_ID, staging_bucket=GCS_STAGING_LOCATION)\n", "\n", "@dsl.pipeline(\n", " name=\"dataproc-templates-pyspark\",\n", " description=\"An example pipeline that uses DataprocPySparkBatchOp to run a PySpark Dataproc Template batch workload\",\n", ")\n", "def pipeline(\n", " batch_id: str = BATCH_ID,\n", " project_id: str = PROJECT_ID,\n", " location: str = REGION,\n", " main_python_file_uri: str = MAIN_PYTHON_FILE,\n", " python_file_uris: list = PYTHON_FILE_URIS,\n", " jar_file_uris: list = JARS,\n", " subnetwork_uri: str = SUBNET,\n", " service_account: str = DATAPROC_SERVICE_ACCOUNT,\n", " args: list = TEMPLATE_SPARK_ARGS,\n", "):\n", "\n", " _ = DataprocPySparkBatchOp(\n", " project=project_id,\n", " location=location,\n", " batch_id=batch_id,\n", " main_python_file_uri=main_python_file_uri,\n", " python_file_uris=python_file_uris,\n", " jar_file_uris=jar_file_uris,\n", " subnetwork_uri=subnetwork_uri,\n", " service_account=service_account,\n", " runtime_config_version=\"1.1\", # issue 665\n", " args=args,\n", " )\n", "\n", "compiler.Compiler().compile(pipeline_func=pipeline, package_path=\"pipeline.json\")\n", "\n", "pipeline = aiplatform.PipelineJob(\n", " display_name=\"pipeline\",\n", " template_path=\"pipeline.json\",\n", " pipeline_root=PIPELINE_ROOT,\n", " location=REGION,\n", " enable_caching=False,\n", ")\n", "\n", "# run() method has an optional parameter `service_account` which you can pass if you want to run pipeline using\n", "# specific service account instead of default service account \n", "# eg. pipeline.run(service_account='test@project_id.iam.gserviceaccount.com')\n", "pipeline.run()" ] } ], "metadata": { "environment": { "kernel": "python3", "name": "common-cpu.m91", "type": "gcloud", "uri": "gcr.io/deeplearning-platform-release/base-cpu:m91" }, "kernelspec": { "display_name": "Python 3 (ipykernel) (Local)", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.9.9" }, "vscode": { "interpreter": { "hash": "31f2aee4e71d21fbe5cf8b01ff0e069b9275f58929596ceb00d14d90e3e16cd6" } } }, "nbformat": 4, "nbformat_minor": 5 }