data-analytics-demos/bigquery-data-governance/dataproc/transform_order_pyspark.py (45 lines of code) (raw):

from pyspark.sql import SparkSession from pyspark.sql.functions import first, col, concat, lit from pyspark.sql.functions import to_timestamp, date_format, from_unixtime # Create a SparkSession spark = SparkSession.builder.appName("OrderEnricher").getOrCreate() # Read the source table customer_transaction_df = spark.read.format("bigquery") \ .option("table", "${project_id}.${bigquery_governed_data_raw_dataset}.customer_transaction") \ .load() # Create order header table order_header_df = customer_transaction_df.groupBy("customer_id", "order_date", "order_time") \ .agg(first("transaction_id").alias("order_id"), first("region").alias("region")) # Create order detail table order_detail_df = customer_transaction_df.select( col("transaction_id").alias("order_id"), col("product"), col("quantity"), col("price") ) # Look up product_id from product table product_df = spark.read.format("bigquery") \ .option("table", "${project_id}.${bigquery_governed_data_enriched_dataset}.product") \ .load() order_detail_enriched_df = order_detail_df.join( product_df, order_detail_df["product"] == product_df["product_name"], "left" ).select( col("order_id"), col("product_id"), col("quantity"), col("price") ) # Combine order_date and order_time into a datetime column order_header_enriched_df = order_header_df.withColumn( "order_datetime", to_timestamp( concat( date_format(col("order_date"), "yyyy-MM-dd"), lit(" "), from_unixtime(col("order_time") / (60 * 60 * 24), "HH:mm:ss") ), "yyyy-MM-dd HH:mm:ss" ) ).drop("order_date", "order_time") # Write to BigQuery order_header_enriched_df.write.format("bigquery") \ .option("temporaryGcsBucket","${governed_data_code_bucket}") \ .option("table", "${project_id}.${bigquery_governed_data_enriched_dataset}.order_header_spark_lineage") \ .mode("overwrite").save() order_detail_enriched_df.write.format("bigquery") \ .option("temporaryGcsBucket","${governed_data_code_bucket}") \ .option("table", "${project_id}.${bigquery_governed_data_enriched_dataset}.order_detail_spark_lineage") \ .mode("overwrite").save() spark.stop()