def ExportTaxiData()

in dataproc/export_taxi_data_from_bq_to_gcs.py [0:0]


def ExportTaxiData(project_id, taxi_dataset_id, temporaryGcsBucket, destination):
    spark = SparkSession \
        .builder \
        .appName("export_taxi_data_from_bq_to_gcs") \
        .getOrCreate()

    # Use the Cloud Storage bucket for temporary BigQuery export data used by the connector.
    bucket = "[bucket]"
    spark.conf.set('temporaryGcsBucket', temporaryGcsBucket)
 
    years = [2019]
    #years = [2021]
    for data_year in years:
        print("data_year: ", data_year)
        for data_month in range(12, 13):
        #for data_month in range(1, 3):
            print("data_month: ", data_month)

            # Sample Code: https://cloud.google.com/dataproc/docs/tutorials/bigquery-connector-spark-example#pyspark
            # To use SQL to BQ
            spark.conf.set("viewsEnabled","true")
            spark.conf.set("materializationProject",project_id)
            spark.conf.set("materializationDataset",taxi_dataset_id)
            print ("BEGIN: Querying Table")
            sql = "SELECT * " + \
                    "FROM `" + project_id + "." + taxi_dataset_id + ".taxi_trips` " + \
                   "WHERE EXTRACT(YEAR  FROM Pickup_DateTime) = " + str(data_year)  + " " + \
                     "AND EXTRACT(MONTH FROM Pickup_DateTime) = " + str(data_month) + ";"
            print ("SQL: ", sql)
            df_taxi_trips = spark.read.format("bigquery").option("query", sql).load()
            print ("END: Querying Table")
     
            # Returns too much data to process with our limited demo core CPU quota
            # Load data from BigQuery taxi_trips table
            """
            print ("BEGIN: Querying Table")
            df_taxi_trips = spark.read.format('bigquery') \
                .option('table', project_id + ':' + taxi_dataset_id + '.taxi_trips') \
                .load()
            print ("END: Querying Table")
            """

            print ("BEGIN: Adding partition columns to dataframe")
            df_taxi_trips_partitioned = df_taxi_trips \
                .withColumn("year",   year       (col("Pickup_DateTime"))) \
                .withColumn("month",  month      (col("Pickup_DateTime"))) \
                .withColumn("day",    dayofmonth (col("Pickup_DateTime"))) \
                .withColumn("hour",   hour       (col("Pickup_DateTime"))) \
                .withColumn("minute", minute     (col("Pickup_DateTime"))) 
            print ("END: Adding partition columns to dataframe")

            # Write as Parquet
            print ("BEGIN: Writing Data to GCS")
            outputPath = destination + "/processed/taxi-trips-query-acceleration/"
            df_taxi_trips_partitioned \
                .write \
                .mode("append") \
                .partitionBy("year","month","day","hour","minute") \
                .parquet(outputPath)
            print ("END: Writing Data to GCS")
                
    spark.stop()