def ConvertTaxiData()

in dataproc/convert_taxi_to_parquet.py [0:0]


def ConvertTaxiData(sourceYellow, sourceGreen, destination):
    print("ConvertTaxiData: sourceYellow: ",sourceYellow)
    print("ConvertTaxiData: sourceGreen:  ",sourceGreen)
    print("ConvertTaxiData: destination:  ",destination)

    spark = SparkSession \
        .builder \
        .appName("ConvertTaxiData") \
        .getOrCreate()
    

    ################################################################################################
    # Yellow
    ################################################################################################    
    yellow_path = sourceYellow.replace("/*/*.parquet","")

    # The NYC taxi data has fields that change data types and some are incompatible with mergeScheme   
    # 2019
    yellow_tripdata_2019_01 = ProcessYellowFile(spark,f"{yellow_path}/2019/yellow_tripdata_2019-01.parquet")
    yellow_tripdata_2019_02 = ProcessYellowFile(spark,f"{yellow_path}/2019/yellow_tripdata_2019-02.parquet")
    yellow_tripdata_2019_03 = ProcessYellowFile(spark,f"{yellow_path}/2019/yellow_tripdata_2019-03.parquet")
    yellow_tripdata_2019_04 = ProcessYellowFile(spark,f"{yellow_path}/2019/yellow_tripdata_2019-04.parquet")
    yellow_tripdata_2019_05 = ProcessYellowFile(spark,f"{yellow_path}/2019/yellow_tripdata_2019-05.parquet")
    yellow_tripdata_2019_06 = ProcessYellowFile(spark,f"{yellow_path}/2019/yellow_tripdata_2019-06.parquet")
    yellow_tripdata_2019_07 = ProcessYellowFile(spark,f"{yellow_path}/2019/yellow_tripdata_2019-07.parquet")
    yellow_tripdata_2019_08 = ProcessYellowFile(spark,f"{yellow_path}/2019/yellow_tripdata_2019-08.parquet")
    yellow_tripdata_2019_09 = ProcessYellowFile(spark,f"{yellow_path}/2019/yellow_tripdata_2019-09.parquet")
    yellow_tripdata_2019_10 = ProcessYellowFile(spark,f"{yellow_path}/2019/yellow_tripdata_2019-10.parquet")
    yellow_tripdata_2019_11 = ProcessYellowFile(spark,f"{yellow_path}/2019/yellow_tripdata_2019-11.parquet")
    yellow_tripdata_2019_12 = ProcessYellowFile(spark,f"{yellow_path}/2019/yellow_tripdata_2019-12.parquet")

    # 2020
    yellow_tripdata_2020_01 = ProcessYellowFile(spark,f"{yellow_path}/2020/yellow_tripdata_2020-01.parquet")
    yellow_tripdata_2020_02 = ProcessYellowFile(spark,f"{yellow_path}/2020/yellow_tripdata_2020-02.parquet")
    yellow_tripdata_2020_03 = ProcessYellowFile(spark,f"{yellow_path}/2020/yellow_tripdata_2020-03.parquet")
    yellow_tripdata_2020_04 = ProcessYellowFile(spark,f"{yellow_path}/2020/yellow_tripdata_2020-04.parquet")
    yellow_tripdata_2020_05 = ProcessYellowFile(spark,f"{yellow_path}/2020/yellow_tripdata_2020-05.parquet")
    yellow_tripdata_2020_06 = ProcessYellowFile(spark,f"{yellow_path}/2020/yellow_tripdata_2020-06.parquet")
    yellow_tripdata_2020_07 = ProcessYellowFile(spark,f"{yellow_path}/2020/yellow_tripdata_2020-07.parquet")
    yellow_tripdata_2020_08 = ProcessYellowFile(spark,f"{yellow_path}/2020/yellow_tripdata_2020-08.parquet")
    yellow_tripdata_2020_09 = ProcessYellowFile(spark,f"{yellow_path}/2020/yellow_tripdata_2020-09.parquet")
    yellow_tripdata_2020_10 = ProcessYellowFile(spark,f"{yellow_path}/2020/yellow_tripdata_2020-10.parquet")
    yellow_tripdata_2020_11 = ProcessYellowFile(spark,f"{yellow_path}/2020/yellow_tripdata_2020-11.parquet")
    yellow_tripdata_2020_12 = ProcessYellowFile(spark,f"{yellow_path}/2020/yellow_tripdata_2020-12.parquet")

    # 2021
    yellow_tripdata_2021_01 = ProcessYellowFile(spark,f"{yellow_path}/2021/yellow_tripdata_2021-01.parquet")
    yellow_tripdata_2021_02 = ProcessYellowFile(spark,f"{yellow_path}/2021/yellow_tripdata_2021-02.parquet")
    yellow_tripdata_2021_03 = ProcessYellowFile(spark,f"{yellow_path}/2021/yellow_tripdata_2021-03.parquet")
    yellow_tripdata_2021_04 = ProcessYellowFile(spark,f"{yellow_path}/2021/yellow_tripdata_2021-04.parquet")
    yellow_tripdata_2021_05 = ProcessYellowFile(spark,f"{yellow_path}/2021/yellow_tripdata_2021-05.parquet")
    yellow_tripdata_2021_06 = ProcessYellowFile(spark,f"{yellow_path}/2021/yellow_tripdata_2021-06.parquet")
    yellow_tripdata_2021_07 = ProcessYellowFile(spark,f"{yellow_path}/2021/yellow_tripdata_2021-07.parquet")
    yellow_tripdata_2021_08 = ProcessYellowFile(spark,f"{yellow_path}/2021/yellow_tripdata_2021-08.parquet")
    yellow_tripdata_2021_09 = ProcessYellowFile(spark,f"{yellow_path}/2021/yellow_tripdata_2021-09.parquet")
    yellow_tripdata_2021_10 = ProcessYellowFile(spark,f"{yellow_path}/2021/yellow_tripdata_2021-10.parquet")
    yellow_tripdata_2021_11 = ProcessYellowFile(spark,f"{yellow_path}/2021/yellow_tripdata_2021-11.parquet")
    yellow_tripdata_2021_12 = ProcessYellowFile(spark,f"{yellow_path}/2021/yellow_tripdata_2021-12.parquet")

    # 2022
    yellow_tripdata_2022_01 = ProcessYellowFile(spark,f"{yellow_path}/2022/yellow_tripdata_2022-01.parquet")
    yellow_tripdata_2022_02 = ProcessYellowFile(spark,f"{yellow_path}/2022/yellow_tripdata_2022-02.parquet")
    yellow_tripdata_2022_03 = ProcessYellowFile(spark,f"{yellow_path}/2022/yellow_tripdata_2022-03.parquet")
    yellow_tripdata_2022_04 = ProcessYellowFile(spark,f"{yellow_path}/2022/yellow_tripdata_2022-04.parquet")
    yellow_tripdata_2022_05 = ProcessYellowFile(spark,f"{yellow_path}/2022/yellow_tripdata_2022-05.parquet")
    yellow_tripdata_2022_06 = ProcessYellowFile(spark,f"{yellow_path}/2022/yellow_tripdata_2022-06.parquet")
    yellow_tripdata_2022_07 = ProcessYellowFile(spark,f"{yellow_path}/2022/yellow_tripdata_2022-07.parquet")
    yellow_tripdata_2022_08 = ProcessYellowFile(spark,f"{yellow_path}/2022/yellow_tripdata_2022-08.parquet")
    yellow_tripdata_2022_09 = ProcessYellowFile(spark,f"{yellow_path}/2022/yellow_tripdata_2022-09.parquet")
    yellow_tripdata_2022_10 = ProcessYellowFile(spark,f"{yellow_path}/2022/yellow_tripdata_2022-10.parquet")
    yellow_tripdata_2022_11 = ProcessYellowFile(spark,f"{yellow_path}/2022/yellow_tripdata_2022-11.parquet")
    yellow_tripdata_2022_12 = ProcessYellowFile(spark,f"{yellow_path}/2022/yellow_tripdata_2022-12.parquet")

    # 2023
    yellow_tripdata_2023_01 = ProcessYellowFile(spark,f"{yellow_path}/2023/yellow_tripdata_2023-01.parquet")
    yellow_tripdata_2023_02 = ProcessYellowFile(spark,f"{yellow_path}/2023/yellow_tripdata_2023-02.parquet")
    yellow_tripdata_2023_03 = ProcessYellowFile(spark,f"{yellow_path}/2023/yellow_tripdata_2023-03.parquet")
    yellow_tripdata_2023_04 = ProcessYellowFile(spark,f"{yellow_path}/2023/yellow_tripdata_2023-04.parquet")
    yellow_tripdata_2023_05 = ProcessYellowFile(spark,f"{yellow_path}/2023/yellow_tripdata_2023-05.parquet")
    yellow_tripdata_2023_06 = ProcessYellowFile(spark,f"{yellow_path}/2023/yellow_tripdata_2023-06.parquet")
    yellow_tripdata_2023_07 = ProcessYellowFile(spark,f"{yellow_path}/2023/yellow_tripdata_2023-07.parquet")
    yellow_tripdata_2023_08 = ProcessYellowFile(spark,f"{yellow_path}/2023/yellow_tripdata_2023-08.parquet")
    yellow_tripdata_2023_09 = ProcessYellowFile(spark,f"{yellow_path}/2023/yellow_tripdata_2023-09.parquet")
    yellow_tripdata_2023_10 = ProcessYellowFile(spark,f"{yellow_path}/2023/yellow_tripdata_2023-10.parquet")
    yellow_tripdata_2023_11 = ProcessYellowFile(spark,f"{yellow_path}/2023/yellow_tripdata_2023-11.parquet")
    yellow_tripdata_2023_12 = ProcessYellowFile(spark,f"{yellow_path}/2023/yellow_tripdata_2023-12.parquet")

    # 2024
    yellow_tripdata_2024_01 = ProcessYellowFile(spark,f"{yellow_path}/2024/yellow_tripdata_2024-01.parquet")
    yellow_tripdata_2024_02 = ProcessYellowFile(spark,f"{yellow_path}/2024/yellow_tripdata_2024-02.parquet")

    # Merge everything together (we should have all the same schema now)
    df_yellow_final = yellow_tripdata_2019_01 \
        .union(yellow_tripdata_2019_02) \
        .union(yellow_tripdata_2019_03) \
        .union(yellow_tripdata_2019_04) \
        .union(yellow_tripdata_2019_05) \
        .union(yellow_tripdata_2019_06) \
        .union(yellow_tripdata_2019_07) \
        .union(yellow_tripdata_2019_08) \
        .union(yellow_tripdata_2019_09) \
        .union(yellow_tripdata_2019_10) \
        .union(yellow_tripdata_2019_11) \
        .union(yellow_tripdata_2019_12) \
        .union(yellow_tripdata_2020_01) \
        .union(yellow_tripdata_2020_02) \
        .union(yellow_tripdata_2020_03) \
        .union(yellow_tripdata_2020_04) \
        .union(yellow_tripdata_2020_05) \
        .union(yellow_tripdata_2020_06) \
        .union(yellow_tripdata_2020_07) \
        .union(yellow_tripdata_2020_08) \
        .union(yellow_tripdata_2020_09) \
        .union(yellow_tripdata_2020_10) \
        .union(yellow_tripdata_2020_11) \
        .union(yellow_tripdata_2020_12) \
        .union(yellow_tripdata_2021_01) \
        .union(yellow_tripdata_2021_02) \
        .union(yellow_tripdata_2021_03) \
        .union(yellow_tripdata_2021_04) \
        .union(yellow_tripdata_2021_05) \
        .union(yellow_tripdata_2021_06) \
        .union(yellow_tripdata_2021_07) \
        .union(yellow_tripdata_2021_08) \
        .union(yellow_tripdata_2021_09) \
        .union(yellow_tripdata_2021_10) \
        .union(yellow_tripdata_2021_11) \
        .union(yellow_tripdata_2021_12) \
        .union(yellow_tripdata_2022_01) \
        .union(yellow_tripdata_2022_02) \
        .union(yellow_tripdata_2022_03) \
        .union(yellow_tripdata_2022_04) \
        .union(yellow_tripdata_2022_05) \
        .union(yellow_tripdata_2022_06) \
        .union(yellow_tripdata_2022_07) \
        .union(yellow_tripdata_2022_08) \
        .union(yellow_tripdata_2022_09) \
        .union(yellow_tripdata_2022_10) \
        .union(yellow_tripdata_2022_11) \
        .union(yellow_tripdata_2022_12) \
        .union(yellow_tripdata_2023_01) \
        .union(yellow_tripdata_2023_02) \
        .union(yellow_tripdata_2023_03) \
        .union(yellow_tripdata_2023_04) \
        .union(yellow_tripdata_2023_05) \
        .union(yellow_tripdata_2023_06) \
        .union(yellow_tripdata_2023_07) \
        .union(yellow_tripdata_2023_08) \
        .union(yellow_tripdata_2023_09) \
        .union(yellow_tripdata_2023_10) \
        .union(yellow_tripdata_2023_11) \
        .union(yellow_tripdata_2023_12) \
        .union(yellow_tripdata_2024_01) \
        .union(yellow_tripdata_2024_02)

    df_with_partition_cols = df_yellow_final \
        .withColumn("year",  year      (col("Pickup_DateTime"))) \
        .withColumn("month", month     (col("Pickup_DateTime"))) \
        .filter("Pickup_DateTime >= '2019-01-01' AND Pickup_DateTime <= '2024-02-29'")

    # Write as Parquet
    df_with_partition_cols \
        .repartition(5) \
        .coalesce(5) \
        .write \
        .mode("overwrite") \
        .partitionBy("year","month") \
        .parquet(destination + "yellow/trips_table/parquet")

    # Write as CSV
    df_with_partition_cols \
        .repartition(5) \
        .coalesce(5) \
        .write \
        .mode("overwrite") \
        .partitionBy("year","month") \
        .format("csv") \
        .option('header',True) \
        .save(destination + "yellow/trips_table/csv")

    # Write as JSON
    df_with_partition_cols \
        .repartition(5) \
        .coalesce(5) \
        .write \
        .mode("overwrite") \
        .partitionBy("year","month") \
        .format("json") \
        .save(destination + "yellow/trips_table/json")


    ################################################################################################
    # Green
    ################################################################################################    
    green_path = sourceGreen.replace("/*/*.parquet","")
    
   # The NYC taxi data has fields that change data types and some are incompatible with mergeScheme   
    # 2019
    green_tripdata_2019_01 = ProcessGreenFile(spark,f"{green_path}/2019/green_tripdata_2019-01.parquet")
    green_tripdata_2019_02 = ProcessGreenFile(spark,f"{green_path}/2019/green_tripdata_2019-02.parquet")
    green_tripdata_2019_03 = ProcessGreenFile(spark,f"{green_path}/2019/green_tripdata_2019-03.parquet")
    green_tripdata_2019_04 = ProcessGreenFile(spark,f"{green_path}/2019/green_tripdata_2019-04.parquet")
    green_tripdata_2019_05 = ProcessGreenFile(spark,f"{green_path}/2019/green_tripdata_2019-05.parquet")
    green_tripdata_2019_06 = ProcessGreenFile(spark,f"{green_path}/2019/green_tripdata_2019-06.parquet")
    green_tripdata_2019_07 = ProcessGreenFile(spark,f"{green_path}/2019/green_tripdata_2019-07.parquet")
    green_tripdata_2019_08 = ProcessGreenFile(spark,f"{green_path}/2019/green_tripdata_2019-08.parquet")
    green_tripdata_2019_09 = ProcessGreenFile(spark,f"{green_path}/2019/green_tripdata_2019-09.parquet")
    green_tripdata_2019_10 = ProcessGreenFile(spark,f"{green_path}/2019/green_tripdata_2019-10.parquet")
    green_tripdata_2019_11 = ProcessGreenFile(spark,f"{green_path}/2019/green_tripdata_2019-11.parquet")
    green_tripdata_2019_12 = ProcessGreenFile(spark,f"{green_path}/2019/green_tripdata_2019-12.parquet")

    # 2020
    green_tripdata_2020_01 = ProcessGreenFile(spark,f"{green_path}/2020/green_tripdata_2020-01.parquet")
    green_tripdata_2020_02 = ProcessGreenFile(spark,f"{green_path}/2020/green_tripdata_2020-02.parquet")
    green_tripdata_2020_03 = ProcessGreenFile(spark,f"{green_path}/2020/green_tripdata_2020-03.parquet")
    green_tripdata_2020_04 = ProcessGreenFile(spark,f"{green_path}/2020/green_tripdata_2020-04.parquet")
    green_tripdata_2020_05 = ProcessGreenFile(spark,f"{green_path}/2020/green_tripdata_2020-05.parquet")
    green_tripdata_2020_06 = ProcessGreenFile(spark,f"{green_path}/2020/green_tripdata_2020-06.parquet")
    green_tripdata_2020_07 = ProcessGreenFile(spark,f"{green_path}/2020/green_tripdata_2020-07.parquet")
    green_tripdata_2020_08 = ProcessGreenFile(spark,f"{green_path}/2020/green_tripdata_2020-08.parquet")
    green_tripdata_2020_09 = ProcessGreenFile(spark,f"{green_path}/2020/green_tripdata_2020-09.parquet")
    green_tripdata_2020_10 = ProcessGreenFile(spark,f"{green_path}/2020/green_tripdata_2020-10.parquet")
    green_tripdata_2020_11 = ProcessGreenFile(spark,f"{green_path}/2020/green_tripdata_2020-11.parquet")
    green_tripdata_2020_12 = ProcessGreenFile(spark,f"{green_path}/2020/green_tripdata_2020-12.parquet")

    # 2021
    green_tripdata_2021_01 = ProcessGreenFile(spark,f"{green_path}/2021/green_tripdata_2021-01.parquet")
    green_tripdata_2021_02 = ProcessGreenFile(spark,f"{green_path}/2021/green_tripdata_2021-02.parquet")
    green_tripdata_2021_03 = ProcessGreenFile(spark,f"{green_path}/2021/green_tripdata_2021-03.parquet")
    green_tripdata_2021_04 = ProcessGreenFile(spark,f"{green_path}/2021/green_tripdata_2021-04.parquet")
    green_tripdata_2021_05 = ProcessGreenFile(spark,f"{green_path}/2021/green_tripdata_2021-05.parquet")
    green_tripdata_2021_06 = ProcessGreenFile(spark,f"{green_path}/2021/green_tripdata_2021-06.parquet")
    green_tripdata_2021_07 = ProcessGreenFile(spark,f"{green_path}/2021/green_tripdata_2021-07.parquet")
    green_tripdata_2021_08 = ProcessGreenFile(spark,f"{green_path}/2021/green_tripdata_2021-08.parquet")
    green_tripdata_2021_09 = ProcessGreenFile(spark,f"{green_path}/2021/green_tripdata_2021-09.parquet")
    green_tripdata_2021_10 = ProcessGreenFile(spark,f"{green_path}/2021/green_tripdata_2021-10.parquet")
    green_tripdata_2021_11 = ProcessGreenFile(spark,f"{green_path}/2021/green_tripdata_2021-11.parquet")
    green_tripdata_2021_12 = ProcessGreenFile(spark,f"{green_path}/2021/green_tripdata_2021-12.parquet")

    # 2022
    green_tripdata_2022_01 = ProcessGreenFile(spark,f"{green_path}/2022/green_tripdata_2022-01.parquet")
    green_tripdata_2022_02 = ProcessGreenFile(spark,f"{green_path}/2022/green_tripdata_2022-02.parquet")
    green_tripdata_2022_03 = ProcessGreenFile(spark,f"{green_path}/2022/green_tripdata_2022-03.parquet")
    green_tripdata_2022_04 = ProcessGreenFile(spark,f"{green_path}/2022/green_tripdata_2022-04.parquet")
    green_tripdata_2022_05 = ProcessGreenFile(spark,f"{green_path}/2022/green_tripdata_2022-05.parquet")
    green_tripdata_2022_06 = ProcessGreenFile(spark,f"{green_path}/2022/green_tripdata_2022-06.parquet")
    green_tripdata_2022_07 = ProcessGreenFile(spark,f"{green_path}/2022/green_tripdata_2022-07.parquet")
    green_tripdata_2022_08 = ProcessGreenFile(spark,f"{green_path}/2022/green_tripdata_2022-08.parquet")
    green_tripdata_2022_09 = ProcessGreenFile(spark,f"{green_path}/2022/green_tripdata_2022-09.parquet")
    green_tripdata_2022_10 = ProcessGreenFile(spark,f"{green_path}/2022/green_tripdata_2022-10.parquet")
    green_tripdata_2022_11 = ProcessGreenFile(spark,f"{green_path}/2022/green_tripdata_2022-11.parquet")
    green_tripdata_2022_12 = ProcessGreenFile(spark,f"{green_path}/2022/green_tripdata_2022-12.parquet")

    # 2023
    green_tripdata_2023_01 = ProcessGreenFile(spark,f"{green_path}/2023/green_tripdata_2023-01.parquet")
    green_tripdata_2023_02 = ProcessGreenFile(spark,f"{green_path}/2023/green_tripdata_2023-02.parquet")
    green_tripdata_2023_03 = ProcessGreenFile(spark,f"{green_path}/2023/green_tripdata_2023-03.parquet")
    green_tripdata_2023_04 = ProcessGreenFile(spark,f"{green_path}/2023/green_tripdata_2023-04.parquet")
    green_tripdata_2023_05 = ProcessGreenFile(spark,f"{green_path}/2023/green_tripdata_2023-05.parquet")
    green_tripdata_2023_06 = ProcessGreenFile(spark,f"{green_path}/2023/green_tripdata_2023-06.parquet")
    green_tripdata_2023_07 = ProcessGreenFile(spark,f"{green_path}/2023/green_tripdata_2023-07.parquet")
    green_tripdata_2023_08 = ProcessGreenFile(spark,f"{green_path}/2023/green_tripdata_2023-08.parquet")
    green_tripdata_2023_09 = ProcessGreenFile(spark,f"{green_path}/2023/green_tripdata_2023-09.parquet")
    green_tripdata_2023_10 = ProcessGreenFile(spark,f"{green_path}/2023/green_tripdata_2023-10.parquet")
    green_tripdata_2023_11 = ProcessGreenFile(spark,f"{green_path}/2023/green_tripdata_2023-11.parquet")
    green_tripdata_2023_12 = ProcessGreenFile(spark,f"{green_path}/2023/green_tripdata_2023-12.parquet")

    # 2024
    green_tripdata_2024_01 = ProcessGreenFile(spark,f"{green_path}/2024/green_tripdata_2024-01.parquet")
    green_tripdata_2024_02 = ProcessGreenFile(spark,f"{green_path}/2024/green_tripdata_2024-02.parquet")

    # Merge everything together (we should have all the same schema now)
    df_green_final = green_tripdata_2019_01 \
        .union(green_tripdata_2019_02) \
        .union(green_tripdata_2019_03) \
        .union(green_tripdata_2019_04) \
        .union(green_tripdata_2019_05) \
        .union(green_tripdata_2019_06) \
        .union(green_tripdata_2019_07) \
        .union(green_tripdata_2019_08) \
        .union(green_tripdata_2019_09) \
        .union(green_tripdata_2019_10) \
        .union(green_tripdata_2019_11) \
        .union(green_tripdata_2019_12) \
        .union(green_tripdata_2020_01) \
        .union(green_tripdata_2020_02) \
        .union(green_tripdata_2020_03) \
        .union(green_tripdata_2020_04) \
        .union(green_tripdata_2020_05) \
        .union(green_tripdata_2020_06) \
        .union(green_tripdata_2020_07) \
        .union(green_tripdata_2020_08) \
        .union(green_tripdata_2020_09) \
        .union(green_tripdata_2020_10) \
        .union(green_tripdata_2020_11) \
        .union(green_tripdata_2020_12) \
        .union(green_tripdata_2021_01) \
        .union(green_tripdata_2021_02) \
        .union(green_tripdata_2021_03) \
        .union(green_tripdata_2021_04) \
        .union(green_tripdata_2021_05) \
        .union(green_tripdata_2021_06) \
        .union(green_tripdata_2021_07) \
        .union(green_tripdata_2021_08) \
        .union(green_tripdata_2021_09) \
        .union(green_tripdata_2021_10) \
        .union(green_tripdata_2021_11) \
        .union(green_tripdata_2021_12) \
        .union(green_tripdata_2022_01) \
        .union(green_tripdata_2022_02) \
        .union(green_tripdata_2022_03) \
        .union(green_tripdata_2022_04) \
        .union(green_tripdata_2022_05) \
        .union(green_tripdata_2022_06) \
        .union(green_tripdata_2022_07) \
        .union(green_tripdata_2022_08) \
        .union(green_tripdata_2022_09) \
        .union(green_tripdata_2022_10) \
        .union(green_tripdata_2022_11) \
        .union(green_tripdata_2022_12) \
        .union(green_tripdata_2023_01) \
        .union(green_tripdata_2023_02) \
        .union(green_tripdata_2023_03) \
        .union(green_tripdata_2023_04) \
        .union(green_tripdata_2023_05) \
        .union(green_tripdata_2023_06) \
        .union(green_tripdata_2023_07) \
        .union(green_tripdata_2023_08) \
        .union(green_tripdata_2023_09) \
        .union(green_tripdata_2023_10) \
        .union(green_tripdata_2023_11) \
        .union(green_tripdata_2023_12) \
        .union(green_tripdata_2024_01) \
        .union(green_tripdata_2024_02)

    df_with_partition_cols = df_green_final \
        .withColumn("year",  year      (col("Pickup_DateTime"))) \
        .withColumn("month", month     (col("Pickup_DateTime"))) \
        .filter("Pickup_DateTime >= '2019-01-01' AND Pickup_DateTime <= '2024-02-29'")

    # Write as Parquet
    df_with_partition_cols \
        .repartition(5) \
        .coalesce(5) \
        .write \
        .mode("overwrite") \
        .partitionBy("year","month") \
        .parquet(destination + "green/trips_table/parquet")

    # Write as CSV
    df_with_partition_cols \
        .repartition(5) \
        .coalesce(5) \
        .write \
        .mode("overwrite") \
        .partitionBy("year","month") \
        .format("csv") \
        .option('header',True) \
        .save(destination + "green/trips_table/csv")

    # Write as JSON
    df_with_partition_cols \
        .repartition(5) \
        .coalesce(5) \
        .write \
        .mode("overwrite") \
        .partitionBy("year","month") \
        .format("json") \
        .save(destination + "green/trips_table/json")
    

    ################################################################################################
    # Create Common Tables 
    ################################################################################################
    # Vendor Table
    vendor_df = spark.createDataFrame(
        [
            (1, "Creative Mobile Technologies"), 
            (2, "VeriFone")
        ],
        ["Vendor_Id", "Vendor_Description"] 
    )        

    vendor_df \
        .repartition(1) \
        .coalesce(1) \
        .write \
        .mode("overwrite") \
        .parquet(destination + "vendor_table/")

    # Rate Code Table
    rate_code_df = spark.createDataFrame(
        [
            (1, "Standard rate"), 
            (2, "JFK"),
            (3, "Newark"),
            (4, "Nassau or Westchester"),
            (5, "Negotiated fare"),
            (6, "Group ride"),
        ],
        ["Rate_Code_Id", "Rate_Code_Description"] 
    )        

    rate_code_df \
        .repartition(1) \
        .coalesce(1) \
        .write \
        .mode("overwrite") \
        .parquet(destination + "rate_code_table/")

    # Payment Type Table
    payment_type_df = spark.createDataFrame(
        [
            (1, "Credit card"), 
            (2, "Cash"),
            (3, "No charge"),
            (4, "Dispute"),
            (5, "Unknown"),
            (6, "Voided trip"),
        ],
        ["Payment_Type_Id", "Payment_Type_Description"] 
    )        

    payment_type_df \
        .repartition(1) \
        .coalesce(1) \
        .write \
        .mode("overwrite") \
        .parquet(destination + "payment_type_table/")

    # Trip Type Table
    trip_type_df = spark.createDataFrame(
        [
            (1, "Street-hail"),
            (2, "Dispatch")
        ],
        ["Trip_Type_Id", "Trip_Type_Description"] 
    )        

    trip_type_df \
        .repartition(1) \
        .coalesce(1) \
        .write \
        .mode("overwrite") \
        .parquet(destination + "trip_type_table/")

    spark.stop()