dataproc/export_taxi_data_biglake_storage_api.py (59 lines of code) (raw):

#!/usr/bin/env python #################################################################################### # Copyright 2022 Google LLC # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # https://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. #################################################################################### # Author: Adam Paternostro # Summary: Read the BigQuery "taxi_dataset.taxi_trips" table and exports to parquet format # This uses dataproc serverless spark # The data is exported partitioned by each minute (inefficient on purpose) # The goal is to generate a lot of small files (antipattern) to demo BQ performance on small files from pyspark.sql.dataframe import DataFrame from pyspark.sql import SparkSession from pyspark.sql.functions import col, year, month, dayofmonth, hour, minute from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DoubleType, TimestampType from datetime import datetime import time import sys def ExportTaxiData(project_id, taxi_dataset_id, temporaryGcsBucket, destination): spark = SparkSession \ .builder \ .appName("export_taxi_data_biglake_storage_api") \ .getOrCreate() # Use the Cloud Storage bucket for temporary BigQuery export data used by the connector. bucket = "[bucket]" spark.conf.set('temporaryGcsBucket', temporaryGcsBucket) # SQL STATEMENT # Sample Code: https://cloud.google.com/dataproc/docs/tutorials/bigquery-connector-spark-example#pyspark # To use SQL to BQ spark.conf.set("viewsEnabled","true") spark.conf.set("materializationProject",project_id) spark.conf.set("materializationDataset",taxi_dataset_id) print ("BEGIN: Querying Table (SQL STATEMENT)") sql = "SELECT * " + \ "FROM `" + project_id + "." + taxi_dataset_id + ".biglake_green_trips` " + \ "WHERE PULocationID = 168;" print ("SQL: ", sql) df_sql = spark.read.format("bigquery").option("query", sql).load() print ("END: Querying Table (SQL STATEMENT)") print ("BEGIN: Writing Data to GCS (SQL STATEMENT)") outputPath = destination + "/processed/df_sql/" df_sql \ .write \ .mode("overwrite") \ .parquet(outputPath) print ("END: Writing Data to GCS (SQL STATEMENT)") # Storage API # Returns too much data to process with our limited demo core CPU quota # Load data from BigQuery taxi_trips table print ("BEGIN: Querying Table TABLE LOAD)") df_table = spark.read.format('bigquery') \ .option('table', project_id + ':' + taxi_dataset_id + '.biglake_green_trips') \ .load() print ("END: Querying Table (TABLE LOAD)") # Write as Parquet print ("BEGIN: Writing Data to GCS (TABLE LOAD)") outputPath = destination + "/processed/df_table/" df_table \ .write \ .mode("overwrite") \ .parquet(outputPath) print ("END: Writing Data to GCS (STORAGE API)") spark.stop() # Main entry point if __name__ == "__main__": if len(sys.argv) != 5: print("Usage: export_taxi_data_biglake_storage_api project_id taxi_dataset_id temporaryGcsBucket destination") sys.exit(-1) project_id = sys.argv[1] taxi_dataset_id = sys.argv[2] temporaryGcsBucket = sys.argv[3] destination = sys.argv[4] print ("project_id: ", project_id) print ("taxi_dataset_id: ", taxi_dataset_id) print ("temporaryGcsBucket: ", temporaryGcsBucket) print ("destination: ", destination) print ("BEGIN: Main") ExportTaxiData(project_id, taxi_dataset_id, temporaryGcsBucket, destination) print ("END: Main") # Sample run via command line using Dataproc Serverless # You must delete a lot of data from the taxi_trips table in order to test this. # The amount of files can overwhelm most Spark clusters """ project_string="ag66cqmpvd" project="paternostro-477-20221014175629" serviceAccount="dataproc-service-account@${project}.iam.gserviceaccount.com" rawBucket="raw-${project}-${project_string}" processedBucket="processed-${project}-${project_string}" gsutil cp ./dataproc/export_taxi_data_biglake_storage_api.py gs://${rawBucket}/pyspark-code batch=`date +%Y-%m-%d-%H-%M-%S` echo "batch: ${batch}" gcloud beta dataproc batches submit pyspark \ --project="${project}" \ --region="REPLACE-REGION" \ --batch="${batch}" \ gs://${rawBucket}/pyspark-code/export_taxi_data_biglake_storage_api.py \ --jars gs://${rawBucket}/pyspark-code/spark-bigquery-with-dependencies_2.12-0.26.0.jar \ --subnet="dataproc-serverless-subnet" \ --deps-bucket="gs://${project}-${project_string}" \ --service-account="${serviceAccount}" \ -- ${project} taxi_dataset bigspark-${project}-${project_string} gs://${processedBucket} # to cancel gcloud dataproc batches cancel ${batch} --project ${project} --region REPLACE-REGION """