data-analytics-demos/bigquery-data-governance/colab-enterprise/01-Spark-Data-Lineage.ipynb (320 lines of code) (raw):

{ "cells": [ { "cell_type": "markdown", "metadata": { "id": "aXGUW2_zdrDi" }, "source": [ "### <font color='#4285f4'>Overview</font>" ] }, { "cell_type": "markdown", "metadata": { "id": "X9dY--R8drDj" }, "source": [ "**Overview**: Create a dataproc cluster that will capture data lineage. The spark job will create two tables in the enriched zone order_header_spark_lineage and order_detail_spark_lineage. You can view the data lineage in BigQuery and you can also compare the lineage to the order_header and order_detail tables which were process with a BigQuery Spark stored procedure (which is currently not capturing data lineage).\n", "\n", "\n", "**Process Flow**:\n", "1. **Create a Dataproc cluster** with the following property: `--properties=\"dataproc:dataproc.lineage.enabled=true\"`. This enables data lineage tracking on the cluster.\n", "\n", "2. **Run the Spark job** with the following properties set. You need to provide the `project-id` you want lineage sent to and the `appName` (which can be any descriptive name).\n", " ```\n", " --properties=spark.openlineage.namespace=project-id,spark.openlineage.appName=OrderEnricher\n", " ```\n", "\n", "3. **Delete the cluster.**\n", "\n", "\n", "Notes:\n", "* This notebook uses a Dataproc cluster. You can also use Dataproc serverless. You might need to set the metadata tags on your compute before creating the serverless job.\n", "\n", "Cost:\n", "* Approximate cost: About $1\n", "\n", "Author:\n", "* Adam Paternostro" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "C3_ajGjEdrDk" }, "outputs": [], "source": [ "# Architecture Diagram\n", "from IPython.display import Image\n", "Image(url='https://storage.googleapis.com/data-analytics-golden-demo/colab-diagrams/BigQuery-Data-Governance-Spark-Lineage.png', width=1200)" ] }, { "cell_type": "markdown", "metadata": { "id": "1euw-NGIdrDk" }, "source": [ "### <font color='#4285f4'>Video Walkthrough</font>" ] }, { "cell_type": "markdown", "metadata": { "id": "uK2mBsSndrDk" }, "source": [ "[Video](https://storage.googleapis.com/data-analytics-golden-demo/colab-videos/Spark-Data-Lineage.mp4)\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "DM288FKhdrDk" }, "outputs": [], "source": [ "from IPython.display import HTML\n", "\n", "HTML(\"\"\"\n", "<video width=\"800\" height=\"600\" controls>\n", " <source src=\"https://storage.googleapis.com/data-analytics-golden-demo/colab-videos/Spark-Data-Lineage.mp4\" type=\"video/mp4\">\n", " Your browser does not support the video tag.\n", "</video>\n", "\"\"\")" ] }, { "cell_type": "markdown", "metadata": { "id": "HMsUvoF4BP7Y" }, "source": [ "### <font color='#4285f4'>License</font>" ] }, { "cell_type": "markdown", "metadata": { "id": "jQgQkbOvj55d" }, "source": [ "```\n", "# Copyright 2024 Google LLC\n", "#\n", "# Licensed under the Apache License, Version 2.0 (the \"License\");\n", "# you may not use this file except in compliance with the License.\n", "# You may obtain a copy of the License at\n", "#\n", "# https://www.apache.org/licenses/LICENSE-2.0\n", "#\n", "# Unless required by applicable law or agreed to in writing, software\n", "# distributed under the License is distributed on an \"AS IS\" BASIS,\n", "# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\n", "# See the License for the specific language governing permissions and\n", "# limitations under the License.\n", "```" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "5MaWM6H5i6rX" }, "outputs": [], "source": [ "# PIP Installs (if necessary)\n", "import sys\n", "\n", "# !{sys.executable} -m pip install REPLACE-ME" ] }, { "cell_type": "markdown", "metadata": { "id": "c51M89g0Ejmz" }, "source": [ "### <font color='#4285f4'>Run Dataproc using gCloud Commands</font>" ] }, { "cell_type": "markdown", "metadata": { "id": "7Uc8xxyse4ZO" }, "source": [ "#### Create the Dataproc cluster\n", "- Creates the cluster and turns on lineage (capture)\n", "- properties=dataproc:dataproc.lineage.enabled=true" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "U4A_7n4SEPNO" }, "outputs": [], "source": [ "!gcloud dataproc clusters create \"my-cluster\" \\\n", " --project=\"${project_id}\" \\\n", " --region=\"${dataproc_region}\" \\\n", " --num-masters=1 \\\n", " --bucket=\"${governed_data_code_bucket}\" \\\n", " --temp-bucket=\"${governed_data_code_bucket}\" \\\n", " --master-machine-type=\"n1-standard-4\" \\\n", " --worker-machine-type=\"n1-standard-4\" \\\n", " --num-workers=2 \\\n", " --image-version=\"2.1.75-debian11\" \\\n", " --subnet=\"dataproc-subnet\" \\\n", " --service-account=\"dataproc-service-account@${project_id}.iam.gserviceaccount.com\" \\\n", " --properties=\"dataproc:dataproc.lineage.enabled=true\" \\\n", " --no-address\n" ] }, { "cell_type": "markdown", "metadata": { "id": "px4gx17xemNP" }, "source": [ "#### Runs the PySpark job\n", "- Runs the job and passes in the lineage parameters\n", "- properties=\n", " - spark.openlineage.namespace=${project_id}\n", " - spark.openlineage.appName=OrderEnricher" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "0MMwN54beOYh" }, "outputs": [], "source": [ "!gcloud dataproc jobs submit pyspark \\\n", " --project=\"${project_id}\" \\\n", " --region=\"${dataproc_region}\" \\\n", " --cluster=\"my-cluster\" \\\n", " --properties=spark.openlineage.namespace=${project_id},spark.openlineage.appName=OrderEnricher \\\n", " gs://${governed_data_code_bucket}/dataproc/transform_order_pyspark.py" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Recreate the sales table using the Spark table to show Spark data lineage" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "%%bigquery\n", "\n", "CREATE OR REPLACE TABLE `${project_id}.${bigquery_governed_data_curated_dataset}.sales` AS\n", "SELECT p.product_name,\n", " p.product_description,\n", " pd.product_category_name,\n", " pd.product_category_description,\n", " oh.region,\n", " oh.order_datetime,\n", " od.price,\n", " od.quantity,\n", " c.*\n", " FROM `${project_id}.${bigquery_governed_data_enriched_dataset}.order_header_spark_lineage` oh\n", " LEFT JOIN `${project_id}.${bigquery_governed_data_enriched_dataset}.order_detail_spark_lineage` od\n", " ON oh.order_id=od.order_id\n", " INNER JOIN `${project_id}.${bigquery_governed_data_enriched_dataset}.product` AS p\n", " ON od.product_id=p.product_id\n", " INNER JOIN `${project_id}.${bigquery_governed_data_enriched_dataset}.product_category` AS pd\n", " ON pd.product_category_id=p.product_category_id\n", " INNER JOIN `${project_id}.${bigquery_governed_data_enriched_dataset}.customer` as c \n", " ON c.customer_id=oh.customer_id;" ] }, { "cell_type": "markdown", "metadata": { "id": "42IxhtRRrvR-" }, "source": [ "### <font color='#4285f4'>Clean Up</font>" ] }, { "cell_type": "markdown", "metadata": { "id": "gaRnEKd6ehOQ" }, "source": [ "#### Delete the cluster\n", "Deletes the Dataproc cluster" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "6lF2Z7skFbvf" }, "outputs": [], "source": [ "!gcloud dataproc clusters delete \"my-cluster\" \\\n", " --project=\"${project_id}\" \\\n", " --region=\"${dataproc_region}\"" ] }, { "cell_type": "markdown", "metadata": { "id": "ASQ2BPisXDA0" }, "source": [ "### <font color='#4285f4'>Reference Links</font>\n" ] }, { "cell_type": "markdown", "metadata": { "id": "rTY6xJdZ3ul8" }, "source": [ "- [Enable data lineage in Dataproc](https://https://cloud.google.com/dataproc/docs/guides/lineage)" ] } ], "metadata": { "colab": { "collapsed_sections": [ "aXGUW2_zdrDi", "1euw-NGIdrDk", "HMsUvoF4BP7Y", "42IxhtRRrvR-", "ASQ2BPisXDA0" ], "name": "Template", "private_outputs": true, "provenance": [] }, "kernelspec": { "display_name": "Python 3", "name": "python3" }, "language_info": { "name": "python" } }, "nbformat": 4, "nbformat_minor": 0 }