### <font color='#4285f4'>Overview</font>

**Overview**: Create a dataproc cluster that will capture data lineage.  The spark job will create two tables in the enriched zone order_header_spark_lineage and order_detail_spark_lineage.  You can view the data lineage in BigQuery and you can also compare the lineage to the order_header and order_detail tables which were process with a BigQuery Spark stored procedure (which is currently not capturing data lineage).


**Process Flow**:
1.  **Create a Dataproc cluster** with the following property: `--properties="dataproc:dataproc.lineage.enabled=true"`. This enables data lineage tracking on the cluster.

2.  **Run the Spark job** with the following properties set. You need to provide the `project-id` you want lineage sent to and the `appName` (which can be any descriptive name).
    ```
    --properties=spark.openlineage.namespace=project-id,spark.openlineage.appName=OrderEnricher
    ```

3.  **Delete the cluster.**


Notes:
* This notebook uses a Dataproc cluster. You can also use Dataproc serverless. You might need to set the metadata tags on your compute before creating the serverless job.

Cost:
* Approximate cost: About $1

Author:
* Adam Paternostro

In [None]:
# Architecture Diagram
from IPython.display import Image
Image(url='https://storage.googleapis.com/data-analytics-golden-demo/colab-diagrams/BigQuery-Data-Governance-Spark-Lineage.png', width=1200)

### <font color='#4285f4'>Video Walkthrough</font>

[Video](https://storage.googleapis.com/data-analytics-golden-demo/colab-videos/Spark-Data-Lineage.mp4)


In [None]:
from IPython.display import HTML

HTML("""
<video width="800" height="600" controls>
  <source src="https://storage.googleapis.com/data-analytics-golden-demo/colab-videos/Spark-Data-Lineage.mp4" type="video/mp4">
  Your browser does not support the video tag.
</video>
""")

### <font color='#4285f4'>License</font>

```
# Copyright 2024 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
```

In [None]:
# PIP Installs (if necessary)
import sys

# !{sys.executable} -m pip install REPLACE-ME

### <font color='#4285f4'>Run Dataproc using gCloud Commands</font>

#### Create the Dataproc cluster
- Creates the cluster and turns on lineage (capture)
- properties=dataproc:dataproc.lineage.enabled=true

In [None]:
!gcloud dataproc clusters create "my-cluster" \
    --project="${project_id}" \
    --region="${dataproc_region}" \
    --num-masters=1 \
    --bucket="${governed_data_code_bucket}" \
    --temp-bucket="${governed_data_code_bucket}" \
    --master-machine-type="n1-standard-4" \
    --worker-machine-type="n1-standard-4" \
    --num-workers=2 \
    --image-version="2.1.75-debian11" \
    --subnet="dataproc-subnet" \
    --service-account="dataproc-service-account@${project_id}.iam.gserviceaccount.com" \
    --properties="dataproc:dataproc.lineage.enabled=true" \
    --no-address


#### Runs the PySpark job
- Runs the job and passes in the lineage parameters
- properties=
  - spark.openlineage.namespace=${project_id}
  - spark.openlineage.appName=OrderEnricher

In [None]:
!gcloud dataproc jobs submit pyspark  \
   --project="${project_id}" \
   --region="${dataproc_region}" \
   --cluster="my-cluster" \
   --properties=spark.openlineage.namespace=${project_id},spark.openlineage.appName=OrderEnricher \
   gs://${governed_data_code_bucket}/dataproc/transform_order_pyspark.py

Recreate the sales table using the Spark table to show Spark data lineage

In [None]:
%%bigquery

CREATE OR REPLACE TABLE `${project_id}.${bigquery_governed_data_curated_dataset}.sales` AS
SELECT p.product_name,
       p.product_description,
       pd.product_category_name,
       pd.product_category_description,
       oh.region,
       oh.order_datetime,
       od.price,
       od.quantity,
       c.*
  FROM `${project_id}.${bigquery_governed_data_enriched_dataset}.order_header_spark_lineage` oh
      LEFT JOIN `${project_id}.${bigquery_governed_data_enriched_dataset}.order_detail_spark_lineage` od
             ON oh.order_id=od.order_id
      INNER JOIN `${project_id}.${bigquery_governed_data_enriched_dataset}.product` AS p
              ON od.product_id=p.product_id
      INNER JOIN `${project_id}.${bigquery_governed_data_enriched_dataset}.product_category` AS pd
              ON pd.product_category_id=p.product_category_id
      INNER JOIN `${project_id}.${bigquery_governed_data_enriched_dataset}.customer` as c 
              ON c.customer_id=oh.customer_id;

### <font color='#4285f4'>Clean Up</font>

#### Delete the cluster
Deletes the Dataproc cluster

In [None]:
!gcloud dataproc clusters delete "my-cluster" \
   --project="${project_id}" \
   --region="${dataproc_region}"

### <font color='#4285f4'>Reference Links</font>


- [Enable data lineage in Dataproc](https://https://cloud.google.com/dataproc/docs/guides/lineage)