data-analytics-demos/bigquery-data-governance/colab-enterprise/01-Spark-Data-Lineage.ipynb (320 lines of code) (raw):
{
"cells": [
{
"cell_type": "markdown",
"metadata": {
"id": "aXGUW2_zdrDi"
},
"source": [
"### <font color='#4285f4'>Overview</font>"
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "X9dY--R8drDj"
},
"source": [
"**Overview**: Create a dataproc cluster that will capture data lineage. The spark job will create two tables in the enriched zone order_header_spark_lineage and order_detail_spark_lineage. You can view the data lineage in BigQuery and you can also compare the lineage to the order_header and order_detail tables which were process with a BigQuery Spark stored procedure (which is currently not capturing data lineage).\n",
"\n",
"\n",
"**Process Flow**:\n",
"1. **Create a Dataproc cluster** with the following property: `--properties=\"dataproc:dataproc.lineage.enabled=true\"`. This enables data lineage tracking on the cluster.\n",
"\n",
"2. **Run the Spark job** with the following properties set. You need to provide the `project-id` you want lineage sent to and the `appName` (which can be any descriptive name).\n",
" ```\n",
" --properties=spark.openlineage.namespace=project-id,spark.openlineage.appName=OrderEnricher\n",
" ```\n",
"\n",
"3. **Delete the cluster.**\n",
"\n",
"\n",
"Notes:\n",
"* This notebook uses a Dataproc cluster. You can also use Dataproc serverless. You might need to set the metadata tags on your compute before creating the serverless job.\n",
"\n",
"Cost:\n",
"* Approximate cost: About $1\n",
"\n",
"Author:\n",
"* Adam Paternostro"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"id": "C3_ajGjEdrDk"
},
"outputs": [],
"source": [
"# Architecture Diagram\n",
"from IPython.display import Image\n",
"Image(url='https://storage.googleapis.com/data-analytics-golden-demo/colab-diagrams/BigQuery-Data-Governance-Spark-Lineage.png', width=1200)"
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "1euw-NGIdrDk"
},
"source": [
"### <font color='#4285f4'>Video Walkthrough</font>"
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "uK2mBsSndrDk"
},
"source": [
"[Video](https://storage.googleapis.com/data-analytics-golden-demo/colab-videos/Spark-Data-Lineage.mp4)\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"id": "DM288FKhdrDk"
},
"outputs": [],
"source": [
"from IPython.display import HTML\n",
"\n",
"HTML(\"\"\"\n",
"<video width=\"800\" height=\"600\" controls>\n",
" <source src=\"https://storage.googleapis.com/data-analytics-golden-demo/colab-videos/Spark-Data-Lineage.mp4\" type=\"video/mp4\">\n",
" Your browser does not support the video tag.\n",
"</video>\n",
"\"\"\")"
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "HMsUvoF4BP7Y"
},
"source": [
"### <font color='#4285f4'>License</font>"
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "jQgQkbOvj55d"
},
"source": [
"```\n",
"# Copyright 2024 Google LLC\n",
"#\n",
"# Licensed under the Apache License, Version 2.0 (the \"License\");\n",
"# you may not use this file except in compliance with the License.\n",
"# You may obtain a copy of the License at\n",
"#\n",
"# https://www.apache.org/licenses/LICENSE-2.0\n",
"#\n",
"# Unless required by applicable law or agreed to in writing, software\n",
"# distributed under the License is distributed on an \"AS IS\" BASIS,\n",
"# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\n",
"# See the License for the specific language governing permissions and\n",
"# limitations under the License.\n",
"```"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"id": "5MaWM6H5i6rX"
},
"outputs": [],
"source": [
"# PIP Installs (if necessary)\n",
"import sys\n",
"\n",
"# !{sys.executable} -m pip install REPLACE-ME"
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "c51M89g0Ejmz"
},
"source": [
"### <font color='#4285f4'>Run Dataproc using gCloud Commands</font>"
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "7Uc8xxyse4ZO"
},
"source": [
"#### Create the Dataproc cluster\n",
"- Creates the cluster and turns on lineage (capture)\n",
"- properties=dataproc:dataproc.lineage.enabled=true"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"id": "U4A_7n4SEPNO"
},
"outputs": [],
"source": [
"!gcloud dataproc clusters create \"my-cluster\" \\\n",
" --project=\"${project_id}\" \\\n",
" --region=\"${dataproc_region}\" \\\n",
" --num-masters=1 \\\n",
" --bucket=\"${governed_data_code_bucket}\" \\\n",
" --temp-bucket=\"${governed_data_code_bucket}\" \\\n",
" --master-machine-type=\"n1-standard-4\" \\\n",
" --worker-machine-type=\"n1-standard-4\" \\\n",
" --num-workers=2 \\\n",
" --image-version=\"2.1.75-debian11\" \\\n",
" --subnet=\"dataproc-subnet\" \\\n",
" --service-account=\"dataproc-service-account@${project_id}.iam.gserviceaccount.com\" \\\n",
" --properties=\"dataproc:dataproc.lineage.enabled=true\" \\\n",
" --no-address\n"
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "px4gx17xemNP"
},
"source": [
"#### Runs the PySpark job\n",
"- Runs the job and passes in the lineage parameters\n",
"- properties=\n",
" - spark.openlineage.namespace=${project_id}\n",
" - spark.openlineage.appName=OrderEnricher"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"id": "0MMwN54beOYh"
},
"outputs": [],
"source": [
"!gcloud dataproc jobs submit pyspark \\\n",
" --project=\"${project_id}\" \\\n",
" --region=\"${dataproc_region}\" \\\n",
" --cluster=\"my-cluster\" \\\n",
" --properties=spark.openlineage.namespace=${project_id},spark.openlineage.appName=OrderEnricher \\\n",
" gs://${governed_data_code_bucket}/dataproc/transform_order_pyspark.py"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Recreate the sales table using the Spark table to show Spark data lineage"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"%%bigquery\n",
"\n",
"CREATE OR REPLACE TABLE `${project_id}.${bigquery_governed_data_curated_dataset}.sales` AS\n",
"SELECT p.product_name,\n",
" p.product_description,\n",
" pd.product_category_name,\n",
" pd.product_category_description,\n",
" oh.region,\n",
" oh.order_datetime,\n",
" od.price,\n",
" od.quantity,\n",
" c.*\n",
" FROM `${project_id}.${bigquery_governed_data_enriched_dataset}.order_header_spark_lineage` oh\n",
" LEFT JOIN `${project_id}.${bigquery_governed_data_enriched_dataset}.order_detail_spark_lineage` od\n",
" ON oh.order_id=od.order_id\n",
" INNER JOIN `${project_id}.${bigquery_governed_data_enriched_dataset}.product` AS p\n",
" ON od.product_id=p.product_id\n",
" INNER JOIN `${project_id}.${bigquery_governed_data_enriched_dataset}.product_category` AS pd\n",
" ON pd.product_category_id=p.product_category_id\n",
" INNER JOIN `${project_id}.${bigquery_governed_data_enriched_dataset}.customer` as c \n",
" ON c.customer_id=oh.customer_id;"
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "42IxhtRRrvR-"
},
"source": [
"### <font color='#4285f4'>Clean Up</font>"
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "gaRnEKd6ehOQ"
},
"source": [
"#### Delete the cluster\n",
"Deletes the Dataproc cluster"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"id": "6lF2Z7skFbvf"
},
"outputs": [],
"source": [
"!gcloud dataproc clusters delete \"my-cluster\" \\\n",
" --project=\"${project_id}\" \\\n",
" --region=\"${dataproc_region}\""
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "ASQ2BPisXDA0"
},
"source": [
"### <font color='#4285f4'>Reference Links</font>\n"
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "rTY6xJdZ3ul8"
},
"source": [
"- [Enable data lineage in Dataproc](https://https://cloud.google.com/dataproc/docs/guides/lineage)"
]
}
],
"metadata": {
"colab": {
"collapsed_sections": [
"aXGUW2_zdrDi",
"1euw-NGIdrDk",
"HMsUvoF4BP7Y",
"42IxhtRRrvR-",
"ASQ2BPisXDA0"
],
"name": "Template",
"private_outputs": true,
"provenance": []
},
"kernelspec": {
"display_name": "Python 3",
"name": "python3"
},
"language_info": {
"name": "python"
}
},
"nbformat": 4,
"nbformat_minor": 0
}