in dataproc/export_taxi_data_biglake_storage_api.py [0:0]
def ExportTaxiData(project_id, taxi_dataset_id, temporaryGcsBucket, destination):
spark = SparkSession \
.builder \
.appName("export_taxi_data_biglake_storage_api") \
.getOrCreate()
# Use the Cloud Storage bucket for temporary BigQuery export data used by the connector.
bucket = "[bucket]"
spark.conf.set('temporaryGcsBucket', temporaryGcsBucket)
# SQL STATEMENT
# Sample Code: https://cloud.google.com/dataproc/docs/tutorials/bigquery-connector-spark-example#pyspark
# To use SQL to BQ
spark.conf.set("viewsEnabled","true")
spark.conf.set("materializationProject",project_id)
spark.conf.set("materializationDataset",taxi_dataset_id)
print ("BEGIN: Querying Table (SQL STATEMENT)")
sql = "SELECT * " + \
"FROM `" + project_id + "." + taxi_dataset_id + ".biglake_green_trips` " + \
"WHERE PULocationID = 168;"
print ("SQL: ", sql)
df_sql = spark.read.format("bigquery").option("query", sql).load()
print ("END: Querying Table (SQL STATEMENT)")
print ("BEGIN: Writing Data to GCS (SQL STATEMENT)")
outputPath = destination + "/processed/df_sql/"
df_sql \
.write \
.mode("overwrite") \
.parquet(outputPath)
print ("END: Writing Data to GCS (SQL STATEMENT)")
# Storage API
# Returns too much data to process with our limited demo core CPU quota
# Load data from BigQuery taxi_trips table
print ("BEGIN: Querying Table TABLE LOAD)")
df_table = spark.read.format('bigquery') \
.option('table', project_id + ':' + taxi_dataset_id + '.biglake_green_trips') \
.load()
print ("END: Querying Table (TABLE LOAD)")
# Write as Parquet
print ("BEGIN: Writing Data to GCS (TABLE LOAD)")
outputPath = destination + "/processed/df_table/"
df_table \
.write \
.mode("overwrite") \
.parquet(outputPath)
print ("END: Writing Data to GCS (STORAGE API)")
spark.stop()