in dataproc/export_taxi_data_from_bq_to_gcs.py [0:0]
def ExportTaxiData(project_id, taxi_dataset_id, temporaryGcsBucket, destination):
spark = SparkSession \
.builder \
.appName("export_taxi_data_from_bq_to_gcs") \
.getOrCreate()
# Use the Cloud Storage bucket for temporary BigQuery export data used by the connector.
bucket = "[bucket]"
spark.conf.set('temporaryGcsBucket', temporaryGcsBucket)
years = [2019]
#years = [2021]
for data_year in years:
print("data_year: ", data_year)
for data_month in range(12, 13):
#for data_month in range(1, 3):
print("data_month: ", data_month)
# Sample Code: https://cloud.google.com/dataproc/docs/tutorials/bigquery-connector-spark-example#pyspark
# To use SQL to BQ
spark.conf.set("viewsEnabled","true")
spark.conf.set("materializationProject",project_id)
spark.conf.set("materializationDataset",taxi_dataset_id)
print ("BEGIN: Querying Table")
sql = "SELECT * " + \
"FROM `" + project_id + "." + taxi_dataset_id + ".taxi_trips` " + \
"WHERE EXTRACT(YEAR FROM Pickup_DateTime) = " + str(data_year) + " " + \
"AND EXTRACT(MONTH FROM Pickup_DateTime) = " + str(data_month) + ";"
print ("SQL: ", sql)
df_taxi_trips = spark.read.format("bigquery").option("query", sql).load()
print ("END: Querying Table")
# Returns too much data to process with our limited demo core CPU quota
# Load data from BigQuery taxi_trips table
"""
print ("BEGIN: Querying Table")
df_taxi_trips = spark.read.format('bigquery') \
.option('table', project_id + ':' + taxi_dataset_id + '.taxi_trips') \
.load()
print ("END: Querying Table")
"""
print ("BEGIN: Adding partition columns to dataframe")
df_taxi_trips_partitioned = df_taxi_trips \
.withColumn("year", year (col("Pickup_DateTime"))) \
.withColumn("month", month (col("Pickup_DateTime"))) \
.withColumn("day", dayofmonth (col("Pickup_DateTime"))) \
.withColumn("hour", hour (col("Pickup_DateTime"))) \
.withColumn("minute", minute (col("Pickup_DateTime")))
print ("END: Adding partition columns to dataframe")
# Write as Parquet
print ("BEGIN: Writing Data to GCS")
outputPath = destination + "/processed/taxi-trips-query-acceleration/"
df_taxi_trips_partitioned \
.write \
.mode("append") \
.partitionBy("year","month","day","hour","minute") \
.parquet(outputPath)
print ("END: Writing Data to GCS")
spark.stop()