####################################################################################
# Copyright 2022 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# 
#     https://www.apache.org/licenses/LICENSE-2.0
# 
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
####################################################################################

# Author:  Adam Paternostro
# Summary: Opens the taxi parquet files and renames the fields as well as partitions the data.

# Note: NYC changes from CSV to Parquet format, this file has been updated on 05/16/2022

from pyspark.sql.dataframe import DataFrame
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, year, month
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DoubleType, TimestampType
from datetime import datetime
import time
import sys


# All the data cannot be read at once since MergeSchema does not work for the NYC data
def ProcessGreenFile(spark, filenameAndPath):
    df_source = spark.read.option("mergeSchema", "true").parquet(filenameAndPath)

    print("ProcessGreenFile: filenameAndPath: ",filenameAndPath)

    df_TypeCast = df_source \
        .withColumn("Vendor_ID",col("VendorID").cast(IntegerType())) \
        .withColumn("Pickup_DateTime",col("lpep_pickup_datetime").cast(TimestampType())) \
        .withColumn("Dropoff_DateTime",col("lpep_dropoff_datetime").cast(TimestampType())) \
        .withColumn("Store_And_Forward",col("store_and_fwd_flag").cast(StringType())) \
        .withColumn("Rate_Code_Id",col("RatecodeID").cast(IntegerType())) \
        .withColumn("PULocationID",col("PULocationID").cast(IntegerType())) \
        .withColumn("DOLocationID",col("DOLocationID").cast(IntegerType())) \
        .withColumn("Passenger_Count",col("passenger_count").cast(IntegerType())) \
        .withColumn("Trip_Distance",col("trip_distance").cast(DoubleType())) \
        .withColumn("Fare_Amount",col("fare_amount").cast(DoubleType())) \
        .withColumn("Surcharge",col("extra").cast(DoubleType())) \
        .withColumn("MTA_Tax",col("mta_tax").cast(DoubleType())) \
        .withColumn("Tip_Amount",col("tip_amount").cast(DoubleType())) \
        .withColumn("Tolls_Amount",col("tolls_amount").cast(DoubleType())) \
        .withColumn("Ehail_Fee",col("ehail_fee").cast(DoubleType())) \
        .withColumn("Improvement_Surcharge",col("improvement_surcharge").cast(DoubleType())) \
        .withColumn("Total_Amount",col("total_amount").cast(DoubleType())) \
        .withColumn("Payment_Type_Id",col("payment_type").cast(IntegerType())) \
        .withColumn("Trip_Type",col("trip_type").cast(IntegerType())) \
        .withColumn("Congestion_Surcharge",col("congestion_surcharge").cast(DoubleType()))

    df_result = df_TypeCast.select( \
        'Vendor_Id', \
        'Pickup_DateTime', \
        'Dropoff_DateTime', \
        'Store_And_Forward', \
        'Rate_Code_Id', \
        'PULocationID', \
        'DOLocationID', \
        'Passenger_Count', \
        'Trip_Distance', \
        'Fare_Amount', \
        'Surcharge', \
        'MTA_Tax', \
        'Tip_Amount', \
        'Tolls_Amount', \
        'Ehail_Fee', \
        'Improvement_Surcharge', \
        'Total_Amount', \
        'Payment_Type_Id', \
        'Trip_Type', \
        'Congestion_Surcharge' \
    )
   
    return df_result


# All the data cannot be read at once since MergeSchema does not work for the NYC data
def ProcessYellowFile(spark, filenameAndPath):
    df_source = spark.read.parquet(filenameAndPath)

    print("ProcessYellowFile: filenameAndPath: ",filenameAndPath)

    df_TypeCast = df_source \
        .withColumn("Vendor_ID",col("VendorID").cast(IntegerType())) \
        .withColumn("Pickup_DateTime",col("tpep_pickup_datetime").cast(TimestampType())) \
        .withColumn("Dropoff_DateTime",col("tpep_dropoff_datetime").cast(TimestampType())) \
        .withColumn("Passenger_Count",col("passenger_count").cast(IntegerType())) \
        .withColumn("Trip_Distance",col("trip_distance").cast(DoubleType())) \
        .withColumn("Rate_Code_Id",col("RatecodeID").cast(IntegerType())) \
        .withColumn("Store_And_Forward",col("store_and_fwd_flag").cast(StringType())) \
        .withColumn("PULocationID",col("PULocationID").cast(IntegerType())) \
        .withColumn("DOLocationID",col("DOLocationID").cast(IntegerType())) \
        .withColumn("Payment_Type_Id",col("payment_type").cast(IntegerType())) \
        .withColumn("Fare_Amount",col("fare_amount").cast(DoubleType())) \
        .withColumn("Surcharge",col("extra").cast(DoubleType())) \
        .withColumn("MTA_Tax",col("mta_tax").cast(DoubleType())) \
        .withColumn("Tip_Amount",col("tip_amount").cast(DoubleType())) \
        .withColumn("Tolls_Amount",col("tolls_amount").cast(DoubleType())) \
        .withColumn("Improvement_Surcharge",col("improvement_surcharge").cast(DoubleType())) \
        .withColumn("Total_Amount",col("total_amount").cast(DoubleType())) \
        .withColumn("Congestion_Surcharge",col("congestion_surcharge").cast(DoubleType()))
        #.withColumn("Airport_Fee",col("airport_fee").cast(DoubleType()))

    df_result = df_TypeCast.select( \
        'Vendor_Id', \
        'Pickup_DateTime', \
        'Dropoff_DateTime', \
        'Passenger_Count', \
        'Trip_Distance', \
        'Rate_Code_Id', \
        'Store_And_Forward', \
        'PULocationID', \
        'DOLocationID', \
        'Payment_Type_Id', \
        'Fare_Amount', \
        'Surcharge', \
        'MTA_Tax', \
        'Tip_Amount', \
        'Tolls_Amount', \
        'Improvement_Surcharge', \
        'Total_Amount', \
        'Congestion_Surcharge' \
        )
    
    return df_result


def ConvertTaxiData(sourceYellow, sourceGreen, destination):
    print("ConvertTaxiData: sourceYellow: ",sourceYellow)
    print("ConvertTaxiData: sourceGreen:  ",sourceGreen)
    print("ConvertTaxiData: destination:  ",destination)

    spark = SparkSession \
        .builder \
        .appName("ConvertTaxiData") \
        .getOrCreate()
    

    ################################################################################################
    # Yellow
    ################################################################################################    
    yellow_path = sourceYellow.replace("/*/*.parquet","")

    # The NYC taxi data has fields that change data types and some are incompatible with mergeScheme   
    # 2019
    yellow_tripdata_2019_01 = ProcessYellowFile(spark,f"{yellow_path}/2019/yellow_tripdata_2019-01.parquet")
    yellow_tripdata_2019_02 = ProcessYellowFile(spark,f"{yellow_path}/2019/yellow_tripdata_2019-02.parquet")
    yellow_tripdata_2019_03 = ProcessYellowFile(spark,f"{yellow_path}/2019/yellow_tripdata_2019-03.parquet")
    yellow_tripdata_2019_04 = ProcessYellowFile(spark,f"{yellow_path}/2019/yellow_tripdata_2019-04.parquet")
    yellow_tripdata_2019_05 = ProcessYellowFile(spark,f"{yellow_path}/2019/yellow_tripdata_2019-05.parquet")
    yellow_tripdata_2019_06 = ProcessYellowFile(spark,f"{yellow_path}/2019/yellow_tripdata_2019-06.parquet")
    yellow_tripdata_2019_07 = ProcessYellowFile(spark,f"{yellow_path}/2019/yellow_tripdata_2019-07.parquet")
    yellow_tripdata_2019_08 = ProcessYellowFile(spark,f"{yellow_path}/2019/yellow_tripdata_2019-08.parquet")
    yellow_tripdata_2019_09 = ProcessYellowFile(spark,f"{yellow_path}/2019/yellow_tripdata_2019-09.parquet")
    yellow_tripdata_2019_10 = ProcessYellowFile(spark,f"{yellow_path}/2019/yellow_tripdata_2019-10.parquet")
    yellow_tripdata_2019_11 = ProcessYellowFile(spark,f"{yellow_path}/2019/yellow_tripdata_2019-11.parquet")
    yellow_tripdata_2019_12 = ProcessYellowFile(spark,f"{yellow_path}/2019/yellow_tripdata_2019-12.parquet")

    # 2020
    yellow_tripdata_2020_01 = ProcessYellowFile(spark,f"{yellow_path}/2020/yellow_tripdata_2020-01.parquet")
    yellow_tripdata_2020_02 = ProcessYellowFile(spark,f"{yellow_path}/2020/yellow_tripdata_2020-02.parquet")
    yellow_tripdata_2020_03 = ProcessYellowFile(spark,f"{yellow_path}/2020/yellow_tripdata_2020-03.parquet")
    yellow_tripdata_2020_04 = ProcessYellowFile(spark,f"{yellow_path}/2020/yellow_tripdata_2020-04.parquet")
    yellow_tripdata_2020_05 = ProcessYellowFile(spark,f"{yellow_path}/2020/yellow_tripdata_2020-05.parquet")
    yellow_tripdata_2020_06 = ProcessYellowFile(spark,f"{yellow_path}/2020/yellow_tripdata_2020-06.parquet")
    yellow_tripdata_2020_07 = ProcessYellowFile(spark,f"{yellow_path}/2020/yellow_tripdata_2020-07.parquet")
    yellow_tripdata_2020_08 = ProcessYellowFile(spark,f"{yellow_path}/2020/yellow_tripdata_2020-08.parquet")
    yellow_tripdata_2020_09 = ProcessYellowFile(spark,f"{yellow_path}/2020/yellow_tripdata_2020-09.parquet")
    yellow_tripdata_2020_10 = ProcessYellowFile(spark,f"{yellow_path}/2020/yellow_tripdata_2020-10.parquet")
    yellow_tripdata_2020_11 = ProcessYellowFile(spark,f"{yellow_path}/2020/yellow_tripdata_2020-11.parquet")
    yellow_tripdata_2020_12 = ProcessYellowFile(spark,f"{yellow_path}/2020/yellow_tripdata_2020-12.parquet")

    # 2021
    yellow_tripdata_2021_01 = ProcessYellowFile(spark,f"{yellow_path}/2021/yellow_tripdata_2021-01.parquet")
    yellow_tripdata_2021_02 = ProcessYellowFile(spark,f"{yellow_path}/2021/yellow_tripdata_2021-02.parquet")
    yellow_tripdata_2021_03 = ProcessYellowFile(spark,f"{yellow_path}/2021/yellow_tripdata_2021-03.parquet")
    yellow_tripdata_2021_04 = ProcessYellowFile(spark,f"{yellow_path}/2021/yellow_tripdata_2021-04.parquet")
    yellow_tripdata_2021_05 = ProcessYellowFile(spark,f"{yellow_path}/2021/yellow_tripdata_2021-05.parquet")
    yellow_tripdata_2021_06 = ProcessYellowFile(spark,f"{yellow_path}/2021/yellow_tripdata_2021-06.parquet")
    yellow_tripdata_2021_07 = ProcessYellowFile(spark,f"{yellow_path}/2021/yellow_tripdata_2021-07.parquet")
    yellow_tripdata_2021_08 = ProcessYellowFile(spark,f"{yellow_path}/2021/yellow_tripdata_2021-08.parquet")
    yellow_tripdata_2021_09 = ProcessYellowFile(spark,f"{yellow_path}/2021/yellow_tripdata_2021-09.parquet")
    yellow_tripdata_2021_10 = ProcessYellowFile(spark,f"{yellow_path}/2021/yellow_tripdata_2021-10.parquet")
    yellow_tripdata_2021_11 = ProcessYellowFile(spark,f"{yellow_path}/2021/yellow_tripdata_2021-11.parquet")
    yellow_tripdata_2021_12 = ProcessYellowFile(spark,f"{yellow_path}/2021/yellow_tripdata_2021-12.parquet")

    # 2022
    yellow_tripdata_2022_01 = ProcessYellowFile(spark,f"{yellow_path}/2022/yellow_tripdata_2022-01.parquet")
    yellow_tripdata_2022_02 = ProcessYellowFile(spark,f"{yellow_path}/2022/yellow_tripdata_2022-02.parquet")
    yellow_tripdata_2022_03 = ProcessYellowFile(spark,f"{yellow_path}/2022/yellow_tripdata_2022-03.parquet")
    yellow_tripdata_2022_04 = ProcessYellowFile(spark,f"{yellow_path}/2022/yellow_tripdata_2022-04.parquet")
    yellow_tripdata_2022_05 = ProcessYellowFile(spark,f"{yellow_path}/2022/yellow_tripdata_2022-05.parquet")
    yellow_tripdata_2022_06 = ProcessYellowFile(spark,f"{yellow_path}/2022/yellow_tripdata_2022-06.parquet")
    yellow_tripdata_2022_07 = ProcessYellowFile(spark,f"{yellow_path}/2022/yellow_tripdata_2022-07.parquet")
    yellow_tripdata_2022_08 = ProcessYellowFile(spark,f"{yellow_path}/2022/yellow_tripdata_2022-08.parquet")
    yellow_tripdata_2022_09 = ProcessYellowFile(spark,f"{yellow_path}/2022/yellow_tripdata_2022-09.parquet")
    yellow_tripdata_2022_10 = ProcessYellowFile(spark,f"{yellow_path}/2022/yellow_tripdata_2022-10.parquet")
    yellow_tripdata_2022_11 = ProcessYellowFile(spark,f"{yellow_path}/2022/yellow_tripdata_2022-11.parquet")
    yellow_tripdata_2022_12 = ProcessYellowFile(spark,f"{yellow_path}/2022/yellow_tripdata_2022-12.parquet")

    # 2023
    yellow_tripdata_2023_01 = ProcessYellowFile(spark,f"{yellow_path}/2023/yellow_tripdata_2023-01.parquet")
    yellow_tripdata_2023_02 = ProcessYellowFile(spark,f"{yellow_path}/2023/yellow_tripdata_2023-02.parquet")
    yellow_tripdata_2023_03 = ProcessYellowFile(spark,f"{yellow_path}/2023/yellow_tripdata_2023-03.parquet")
    yellow_tripdata_2023_04 = ProcessYellowFile(spark,f"{yellow_path}/2023/yellow_tripdata_2023-04.parquet")
    yellow_tripdata_2023_05 = ProcessYellowFile(spark,f"{yellow_path}/2023/yellow_tripdata_2023-05.parquet")
    yellow_tripdata_2023_06 = ProcessYellowFile(spark,f"{yellow_path}/2023/yellow_tripdata_2023-06.parquet")
    yellow_tripdata_2023_07 = ProcessYellowFile(spark,f"{yellow_path}/2023/yellow_tripdata_2023-07.parquet")
    yellow_tripdata_2023_08 = ProcessYellowFile(spark,f"{yellow_path}/2023/yellow_tripdata_2023-08.parquet")
    yellow_tripdata_2023_09 = ProcessYellowFile(spark,f"{yellow_path}/2023/yellow_tripdata_2023-09.parquet")
    yellow_tripdata_2023_10 = ProcessYellowFile(spark,f"{yellow_path}/2023/yellow_tripdata_2023-10.parquet")
    yellow_tripdata_2023_11 = ProcessYellowFile(spark,f"{yellow_path}/2023/yellow_tripdata_2023-11.parquet")
    yellow_tripdata_2023_12 = ProcessYellowFile(spark,f"{yellow_path}/2023/yellow_tripdata_2023-12.parquet")

    # 2024
    yellow_tripdata_2024_01 = ProcessYellowFile(spark,f"{yellow_path}/2024/yellow_tripdata_2024-01.parquet")
    yellow_tripdata_2024_02 = ProcessYellowFile(spark,f"{yellow_path}/2024/yellow_tripdata_2024-02.parquet")

    # Merge everything together (we should have all the same schema now)
    df_yellow_final = yellow_tripdata_2019_01 \
        .union(yellow_tripdata_2019_02) \
        .union(yellow_tripdata_2019_03) \
        .union(yellow_tripdata_2019_04) \
        .union(yellow_tripdata_2019_05) \
        .union(yellow_tripdata_2019_06) \
        .union(yellow_tripdata_2019_07) \
        .union(yellow_tripdata_2019_08) \
        .union(yellow_tripdata_2019_09) \
        .union(yellow_tripdata_2019_10) \
        .union(yellow_tripdata_2019_11) \
        .union(yellow_tripdata_2019_12) \
        .union(yellow_tripdata_2020_01) \
        .union(yellow_tripdata_2020_02) \
        .union(yellow_tripdata_2020_03) \
        .union(yellow_tripdata_2020_04) \
        .union(yellow_tripdata_2020_05) \
        .union(yellow_tripdata_2020_06) \
        .union(yellow_tripdata_2020_07) \
        .union(yellow_tripdata_2020_08) \
        .union(yellow_tripdata_2020_09) \
        .union(yellow_tripdata_2020_10) \
        .union(yellow_tripdata_2020_11) \
        .union(yellow_tripdata_2020_12) \
        .union(yellow_tripdata_2021_01) \
        .union(yellow_tripdata_2021_02) \
        .union(yellow_tripdata_2021_03) \
        .union(yellow_tripdata_2021_04) \
        .union(yellow_tripdata_2021_05) \
        .union(yellow_tripdata_2021_06) \
        .union(yellow_tripdata_2021_07) \
        .union(yellow_tripdata_2021_08) \
        .union(yellow_tripdata_2021_09) \
        .union(yellow_tripdata_2021_10) \
        .union(yellow_tripdata_2021_11) \
        .union(yellow_tripdata_2021_12) \
        .union(yellow_tripdata_2022_01) \
        .union(yellow_tripdata_2022_02) \
        .union(yellow_tripdata_2022_03) \
        .union(yellow_tripdata_2022_04) \
        .union(yellow_tripdata_2022_05) \
        .union(yellow_tripdata_2022_06) \
        .union(yellow_tripdata_2022_07) \
        .union(yellow_tripdata_2022_08) \
        .union(yellow_tripdata_2022_09) \
        .union(yellow_tripdata_2022_10) \
        .union(yellow_tripdata_2022_11) \
        .union(yellow_tripdata_2022_12) \
        .union(yellow_tripdata_2023_01) \
        .union(yellow_tripdata_2023_02) \
        .union(yellow_tripdata_2023_03) \
        .union(yellow_tripdata_2023_04) \
        .union(yellow_tripdata_2023_05) \
        .union(yellow_tripdata_2023_06) \
        .union(yellow_tripdata_2023_07) \
        .union(yellow_tripdata_2023_08) \
        .union(yellow_tripdata_2023_09) \
        .union(yellow_tripdata_2023_10) \
        .union(yellow_tripdata_2023_11) \
        .union(yellow_tripdata_2023_12) \
        .union(yellow_tripdata_2024_01) \
        .union(yellow_tripdata_2024_02)

    df_with_partition_cols = df_yellow_final \
        .withColumn("year",  year      (col("Pickup_DateTime"))) \
        .withColumn("month", month     (col("Pickup_DateTime"))) \
        .filter("Pickup_DateTime >= '2019-01-01' AND Pickup_DateTime <= '2024-02-29'")

    # Write as Parquet
    df_with_partition_cols \
        .repartition(5) \
        .coalesce(5) \
        .write \
        .mode("overwrite") \
        .partitionBy("year","month") \
        .parquet(destination + "yellow/trips_table/parquet")

    # Write as CSV
    df_with_partition_cols \
        .repartition(5) \
        .coalesce(5) \
        .write \
        .mode("overwrite") \
        .partitionBy("year","month") \
        .format("csv") \
        .option('header',True) \
        .save(destination + "yellow/trips_table/csv")

    # Write as JSON
    df_with_partition_cols \
        .repartition(5) \
        .coalesce(5) \
        .write \
        .mode("overwrite") \
        .partitionBy("year","month") \
        .format("json") \
        .save(destination + "yellow/trips_table/json")


    ################################################################################################
    # Green
    ################################################################################################    
    green_path = sourceGreen.replace("/*/*.parquet","")
    
   # The NYC taxi data has fields that change data types and some are incompatible with mergeScheme   
    # 2019
    green_tripdata_2019_01 = ProcessGreenFile(spark,f"{green_path}/2019/green_tripdata_2019-01.parquet")
    green_tripdata_2019_02 = ProcessGreenFile(spark,f"{green_path}/2019/green_tripdata_2019-02.parquet")
    green_tripdata_2019_03 = ProcessGreenFile(spark,f"{green_path}/2019/green_tripdata_2019-03.parquet")
    green_tripdata_2019_04 = ProcessGreenFile(spark,f"{green_path}/2019/green_tripdata_2019-04.parquet")
    green_tripdata_2019_05 = ProcessGreenFile(spark,f"{green_path}/2019/green_tripdata_2019-05.parquet")
    green_tripdata_2019_06 = ProcessGreenFile(spark,f"{green_path}/2019/green_tripdata_2019-06.parquet")
    green_tripdata_2019_07 = ProcessGreenFile(spark,f"{green_path}/2019/green_tripdata_2019-07.parquet")
    green_tripdata_2019_08 = ProcessGreenFile(spark,f"{green_path}/2019/green_tripdata_2019-08.parquet")
    green_tripdata_2019_09 = ProcessGreenFile(spark,f"{green_path}/2019/green_tripdata_2019-09.parquet")
    green_tripdata_2019_10 = ProcessGreenFile(spark,f"{green_path}/2019/green_tripdata_2019-10.parquet")
    green_tripdata_2019_11 = ProcessGreenFile(spark,f"{green_path}/2019/green_tripdata_2019-11.parquet")
    green_tripdata_2019_12 = ProcessGreenFile(spark,f"{green_path}/2019/green_tripdata_2019-12.parquet")

    # 2020
    green_tripdata_2020_01 = ProcessGreenFile(spark,f"{green_path}/2020/green_tripdata_2020-01.parquet")
    green_tripdata_2020_02 = ProcessGreenFile(spark,f"{green_path}/2020/green_tripdata_2020-02.parquet")
    green_tripdata_2020_03 = ProcessGreenFile(spark,f"{green_path}/2020/green_tripdata_2020-03.parquet")
    green_tripdata_2020_04 = ProcessGreenFile(spark,f"{green_path}/2020/green_tripdata_2020-04.parquet")
    green_tripdata_2020_05 = ProcessGreenFile(spark,f"{green_path}/2020/green_tripdata_2020-05.parquet")
    green_tripdata_2020_06 = ProcessGreenFile(spark,f"{green_path}/2020/green_tripdata_2020-06.parquet")
    green_tripdata_2020_07 = ProcessGreenFile(spark,f"{green_path}/2020/green_tripdata_2020-07.parquet")
    green_tripdata_2020_08 = ProcessGreenFile(spark,f"{green_path}/2020/green_tripdata_2020-08.parquet")
    green_tripdata_2020_09 = ProcessGreenFile(spark,f"{green_path}/2020/green_tripdata_2020-09.parquet")
    green_tripdata_2020_10 = ProcessGreenFile(spark,f"{green_path}/2020/green_tripdata_2020-10.parquet")
    green_tripdata_2020_11 = ProcessGreenFile(spark,f"{green_path}/2020/green_tripdata_2020-11.parquet")
    green_tripdata_2020_12 = ProcessGreenFile(spark,f"{green_path}/2020/green_tripdata_2020-12.parquet")

    # 2021
    green_tripdata_2021_01 = ProcessGreenFile(spark,f"{green_path}/2021/green_tripdata_2021-01.parquet")
    green_tripdata_2021_02 = ProcessGreenFile(spark,f"{green_path}/2021/green_tripdata_2021-02.parquet")
    green_tripdata_2021_03 = ProcessGreenFile(spark,f"{green_path}/2021/green_tripdata_2021-03.parquet")
    green_tripdata_2021_04 = ProcessGreenFile(spark,f"{green_path}/2021/green_tripdata_2021-04.parquet")
    green_tripdata_2021_05 = ProcessGreenFile(spark,f"{green_path}/2021/green_tripdata_2021-05.parquet")
    green_tripdata_2021_06 = ProcessGreenFile(spark,f"{green_path}/2021/green_tripdata_2021-06.parquet")
    green_tripdata_2021_07 = ProcessGreenFile(spark,f"{green_path}/2021/green_tripdata_2021-07.parquet")
    green_tripdata_2021_08 = ProcessGreenFile(spark,f"{green_path}/2021/green_tripdata_2021-08.parquet")
    green_tripdata_2021_09 = ProcessGreenFile(spark,f"{green_path}/2021/green_tripdata_2021-09.parquet")
    green_tripdata_2021_10 = ProcessGreenFile(spark,f"{green_path}/2021/green_tripdata_2021-10.parquet")
    green_tripdata_2021_11 = ProcessGreenFile(spark,f"{green_path}/2021/green_tripdata_2021-11.parquet")
    green_tripdata_2021_12 = ProcessGreenFile(spark,f"{green_path}/2021/green_tripdata_2021-12.parquet")

    # 2022
    green_tripdata_2022_01 = ProcessGreenFile(spark,f"{green_path}/2022/green_tripdata_2022-01.parquet")
    green_tripdata_2022_02 = ProcessGreenFile(spark,f"{green_path}/2022/green_tripdata_2022-02.parquet")
    green_tripdata_2022_03 = ProcessGreenFile(spark,f"{green_path}/2022/green_tripdata_2022-03.parquet")
    green_tripdata_2022_04 = ProcessGreenFile(spark,f"{green_path}/2022/green_tripdata_2022-04.parquet")
    green_tripdata_2022_05 = ProcessGreenFile(spark,f"{green_path}/2022/green_tripdata_2022-05.parquet")
    green_tripdata_2022_06 = ProcessGreenFile(spark,f"{green_path}/2022/green_tripdata_2022-06.parquet")
    green_tripdata_2022_07 = ProcessGreenFile(spark,f"{green_path}/2022/green_tripdata_2022-07.parquet")
    green_tripdata_2022_08 = ProcessGreenFile(spark,f"{green_path}/2022/green_tripdata_2022-08.parquet")
    green_tripdata_2022_09 = ProcessGreenFile(spark,f"{green_path}/2022/green_tripdata_2022-09.parquet")
    green_tripdata_2022_10 = ProcessGreenFile(spark,f"{green_path}/2022/green_tripdata_2022-10.parquet")
    green_tripdata_2022_11 = ProcessGreenFile(spark,f"{green_path}/2022/green_tripdata_2022-11.parquet")
    green_tripdata_2022_12 = ProcessGreenFile(spark,f"{green_path}/2022/green_tripdata_2022-12.parquet")

    # 2023
    green_tripdata_2023_01 = ProcessGreenFile(spark,f"{green_path}/2023/green_tripdata_2023-01.parquet")
    green_tripdata_2023_02 = ProcessGreenFile(spark,f"{green_path}/2023/green_tripdata_2023-02.parquet")
    green_tripdata_2023_03 = ProcessGreenFile(spark,f"{green_path}/2023/green_tripdata_2023-03.parquet")
    green_tripdata_2023_04 = ProcessGreenFile(spark,f"{green_path}/2023/green_tripdata_2023-04.parquet")
    green_tripdata_2023_05 = ProcessGreenFile(spark,f"{green_path}/2023/green_tripdata_2023-05.parquet")
    green_tripdata_2023_06 = ProcessGreenFile(spark,f"{green_path}/2023/green_tripdata_2023-06.parquet")
    green_tripdata_2023_07 = ProcessGreenFile(spark,f"{green_path}/2023/green_tripdata_2023-07.parquet")
    green_tripdata_2023_08 = ProcessGreenFile(spark,f"{green_path}/2023/green_tripdata_2023-08.parquet")
    green_tripdata_2023_09 = ProcessGreenFile(spark,f"{green_path}/2023/green_tripdata_2023-09.parquet")
    green_tripdata_2023_10 = ProcessGreenFile(spark,f"{green_path}/2023/green_tripdata_2023-10.parquet")
    green_tripdata_2023_11 = ProcessGreenFile(spark,f"{green_path}/2023/green_tripdata_2023-11.parquet")
    green_tripdata_2023_12 = ProcessGreenFile(spark,f"{green_path}/2023/green_tripdata_2023-12.parquet")

    # 2024
    green_tripdata_2024_01 = ProcessGreenFile(spark,f"{green_path}/2024/green_tripdata_2024-01.parquet")
    green_tripdata_2024_02 = ProcessGreenFile(spark,f"{green_path}/2024/green_tripdata_2024-02.parquet")

    # Merge everything together (we should have all the same schema now)
    df_green_final = green_tripdata_2019_01 \
        .union(green_tripdata_2019_02) \
        .union(green_tripdata_2019_03) \
        .union(green_tripdata_2019_04) \
        .union(green_tripdata_2019_05) \
        .union(green_tripdata_2019_06) \
        .union(green_tripdata_2019_07) \
        .union(green_tripdata_2019_08) \
        .union(green_tripdata_2019_09) \
        .union(green_tripdata_2019_10) \
        .union(green_tripdata_2019_11) \
        .union(green_tripdata_2019_12) \
        .union(green_tripdata_2020_01) \
        .union(green_tripdata_2020_02) \
        .union(green_tripdata_2020_03) \
        .union(green_tripdata_2020_04) \
        .union(green_tripdata_2020_05) \
        .union(green_tripdata_2020_06) \
        .union(green_tripdata_2020_07) \
        .union(green_tripdata_2020_08) \
        .union(green_tripdata_2020_09) \
        .union(green_tripdata_2020_10) \
        .union(green_tripdata_2020_11) \
        .union(green_tripdata_2020_12) \
        .union(green_tripdata_2021_01) \
        .union(green_tripdata_2021_02) \
        .union(green_tripdata_2021_03) \
        .union(green_tripdata_2021_04) \
        .union(green_tripdata_2021_05) \
        .union(green_tripdata_2021_06) \
        .union(green_tripdata_2021_07) \
        .union(green_tripdata_2021_08) \
        .union(green_tripdata_2021_09) \
        .union(green_tripdata_2021_10) \
        .union(green_tripdata_2021_11) \
        .union(green_tripdata_2021_12) \
        .union(green_tripdata_2022_01) \
        .union(green_tripdata_2022_02) \
        .union(green_tripdata_2022_03) \
        .union(green_tripdata_2022_04) \
        .union(green_tripdata_2022_05) \
        .union(green_tripdata_2022_06) \
        .union(green_tripdata_2022_07) \
        .union(green_tripdata_2022_08) \
        .union(green_tripdata_2022_09) \
        .union(green_tripdata_2022_10) \
        .union(green_tripdata_2022_11) \
        .union(green_tripdata_2022_12) \
        .union(green_tripdata_2023_01) \
        .union(green_tripdata_2023_02) \
        .union(green_tripdata_2023_03) \
        .union(green_tripdata_2023_04) \
        .union(green_tripdata_2023_05) \
        .union(green_tripdata_2023_06) \
        .union(green_tripdata_2023_07) \
        .union(green_tripdata_2023_08) \
        .union(green_tripdata_2023_09) \
        .union(green_tripdata_2023_10) \
        .union(green_tripdata_2023_11) \
        .union(green_tripdata_2023_12) \
        .union(green_tripdata_2024_01) \
        .union(green_tripdata_2024_02)

    df_with_partition_cols = df_green_final \
        .withColumn("year",  year      (col("Pickup_DateTime"))) \
        .withColumn("month", month     (col("Pickup_DateTime"))) \
        .filter("Pickup_DateTime >= '2019-01-01' AND Pickup_DateTime <= '2024-02-29'")

    # Write as Parquet
    df_with_partition_cols \
        .repartition(5) \
        .coalesce(5) \
        .write \
        .mode("overwrite") \
        .partitionBy("year","month") \
        .parquet(destination + "green/trips_table/parquet")

    # Write as CSV
    df_with_partition_cols \
        .repartition(5) \
        .coalesce(5) \
        .write \
        .mode("overwrite") \
        .partitionBy("year","month") \
        .format("csv") \
        .option('header',True) \
        .save(destination + "green/trips_table/csv")

    # Write as JSON
    df_with_partition_cols \
        .repartition(5) \
        .coalesce(5) \
        .write \
        .mode("overwrite") \
        .partitionBy("year","month") \
        .format("json") \
        .save(destination + "green/trips_table/json")
    

    ################################################################################################
    # Create Common Tables 
    ################################################################################################
    # Vendor Table
    vendor_df = spark.createDataFrame(
        [
            (1, "Creative Mobile Technologies"), 
            (2, "VeriFone")
        ],
        ["Vendor_Id", "Vendor_Description"] 
    )        

    vendor_df \
        .repartition(1) \
        .coalesce(1) \
        .write \
        .mode("overwrite") \
        .parquet(destination + "vendor_table/")

    # Rate Code Table
    rate_code_df = spark.createDataFrame(
        [
            (1, "Standard rate"), 
            (2, "JFK"),
            (3, "Newark"),
            (4, "Nassau or Westchester"),
            (5, "Negotiated fare"),
            (6, "Group ride"),
        ],
        ["Rate_Code_Id", "Rate_Code_Description"] 
    )        

    rate_code_df \
        .repartition(1) \
        .coalesce(1) \
        .write \
        .mode("overwrite") \
        .parquet(destination + "rate_code_table/")

    # Payment Type Table
    payment_type_df = spark.createDataFrame(
        [
            (1, "Credit card"), 
            (2, "Cash"),
            (3, "No charge"),
            (4, "Dispute"),
            (5, "Unknown"),
            (6, "Voided trip"),
        ],
        ["Payment_Type_Id", "Payment_Type_Description"] 
    )        

    payment_type_df \
        .repartition(1) \
        .coalesce(1) \
        .write \
        .mode("overwrite") \
        .parquet(destination + "payment_type_table/")

    # Trip Type Table
    trip_type_df = spark.createDataFrame(
        [
            (1, "Street-hail"),
            (2, "Dispatch")
        ],
        ["Trip_Type_Id", "Trip_Type_Description"] 
    )        

    trip_type_df \
        .repartition(1) \
        .coalesce(1) \
        .write \
        .mode("overwrite") \
        .parquet(destination + "trip_type_table/")

    spark.stop()


# Main entry point
# convert_taxi_to_parquet gs://big-query-demo-09/test-taxi/yellow gs://big-query-demo-09/test-taxi/green gs://big-query-demo-09/test-taxi-output
if __name__ == "__main__":
    if len(sys.argv) != 4:
        print("Usage: convert_taxi_to_parquet sourceYellow sourceGreen destination")
        sys.exit(-1)

    sourceYellow = sys.argv[1]
    sourceGreen  = sys.argv[2]
    destination  = sys.argv[3]

    print ("BEGIN: Main")
    ConvertTaxiData(sourceYellow, sourceGreen, destination)
    print ("END: Main")

# Sample run 
# gcloud dataproc jobs submit pyspark  \
#    --cluster "testcluster" \
#    --region="REPLACE-REGION" \
#    gs://big-query-demo-09/pyspark-code/convert_taxi_to_parquet.py \
#    -- gs://big-query-demo-09/test-taxi/yellow/*/*.parquet \
#       gs://big-query-demo-09/test-taxi/green/*/*.parquet \
#       gs://big-query-demo-09/test-taxi/dest/