in dataproc/convert_taxi_to_parquet.py [0:0]
def ConvertTaxiData(sourceYellow, sourceGreen, destination):
print("ConvertTaxiData: sourceYellow: ",sourceYellow)
print("ConvertTaxiData: sourceGreen: ",sourceGreen)
print("ConvertTaxiData: destination: ",destination)
spark = SparkSession \
.builder \
.appName("ConvertTaxiData") \
.getOrCreate()
################################################################################################
# Yellow
################################################################################################
yellow_path = sourceYellow.replace("/*/*.parquet","")
# The NYC taxi data has fields that change data types and some are incompatible with mergeScheme
# 2019
yellow_tripdata_2019_01 = ProcessYellowFile(spark,f"{yellow_path}/2019/yellow_tripdata_2019-01.parquet")
yellow_tripdata_2019_02 = ProcessYellowFile(spark,f"{yellow_path}/2019/yellow_tripdata_2019-02.parquet")
yellow_tripdata_2019_03 = ProcessYellowFile(spark,f"{yellow_path}/2019/yellow_tripdata_2019-03.parquet")
yellow_tripdata_2019_04 = ProcessYellowFile(spark,f"{yellow_path}/2019/yellow_tripdata_2019-04.parquet")
yellow_tripdata_2019_05 = ProcessYellowFile(spark,f"{yellow_path}/2019/yellow_tripdata_2019-05.parquet")
yellow_tripdata_2019_06 = ProcessYellowFile(spark,f"{yellow_path}/2019/yellow_tripdata_2019-06.parquet")
yellow_tripdata_2019_07 = ProcessYellowFile(spark,f"{yellow_path}/2019/yellow_tripdata_2019-07.parquet")
yellow_tripdata_2019_08 = ProcessYellowFile(spark,f"{yellow_path}/2019/yellow_tripdata_2019-08.parquet")
yellow_tripdata_2019_09 = ProcessYellowFile(spark,f"{yellow_path}/2019/yellow_tripdata_2019-09.parquet")
yellow_tripdata_2019_10 = ProcessYellowFile(spark,f"{yellow_path}/2019/yellow_tripdata_2019-10.parquet")
yellow_tripdata_2019_11 = ProcessYellowFile(spark,f"{yellow_path}/2019/yellow_tripdata_2019-11.parquet")
yellow_tripdata_2019_12 = ProcessYellowFile(spark,f"{yellow_path}/2019/yellow_tripdata_2019-12.parquet")
# 2020
yellow_tripdata_2020_01 = ProcessYellowFile(spark,f"{yellow_path}/2020/yellow_tripdata_2020-01.parquet")
yellow_tripdata_2020_02 = ProcessYellowFile(spark,f"{yellow_path}/2020/yellow_tripdata_2020-02.parquet")
yellow_tripdata_2020_03 = ProcessYellowFile(spark,f"{yellow_path}/2020/yellow_tripdata_2020-03.parquet")
yellow_tripdata_2020_04 = ProcessYellowFile(spark,f"{yellow_path}/2020/yellow_tripdata_2020-04.parquet")
yellow_tripdata_2020_05 = ProcessYellowFile(spark,f"{yellow_path}/2020/yellow_tripdata_2020-05.parquet")
yellow_tripdata_2020_06 = ProcessYellowFile(spark,f"{yellow_path}/2020/yellow_tripdata_2020-06.parquet")
yellow_tripdata_2020_07 = ProcessYellowFile(spark,f"{yellow_path}/2020/yellow_tripdata_2020-07.parquet")
yellow_tripdata_2020_08 = ProcessYellowFile(spark,f"{yellow_path}/2020/yellow_tripdata_2020-08.parquet")
yellow_tripdata_2020_09 = ProcessYellowFile(spark,f"{yellow_path}/2020/yellow_tripdata_2020-09.parquet")
yellow_tripdata_2020_10 = ProcessYellowFile(spark,f"{yellow_path}/2020/yellow_tripdata_2020-10.parquet")
yellow_tripdata_2020_11 = ProcessYellowFile(spark,f"{yellow_path}/2020/yellow_tripdata_2020-11.parquet")
yellow_tripdata_2020_12 = ProcessYellowFile(spark,f"{yellow_path}/2020/yellow_tripdata_2020-12.parquet")
# 2021
yellow_tripdata_2021_01 = ProcessYellowFile(spark,f"{yellow_path}/2021/yellow_tripdata_2021-01.parquet")
yellow_tripdata_2021_02 = ProcessYellowFile(spark,f"{yellow_path}/2021/yellow_tripdata_2021-02.parquet")
yellow_tripdata_2021_03 = ProcessYellowFile(spark,f"{yellow_path}/2021/yellow_tripdata_2021-03.parquet")
yellow_tripdata_2021_04 = ProcessYellowFile(spark,f"{yellow_path}/2021/yellow_tripdata_2021-04.parquet")
yellow_tripdata_2021_05 = ProcessYellowFile(spark,f"{yellow_path}/2021/yellow_tripdata_2021-05.parquet")
yellow_tripdata_2021_06 = ProcessYellowFile(spark,f"{yellow_path}/2021/yellow_tripdata_2021-06.parquet")
yellow_tripdata_2021_07 = ProcessYellowFile(spark,f"{yellow_path}/2021/yellow_tripdata_2021-07.parquet")
yellow_tripdata_2021_08 = ProcessYellowFile(spark,f"{yellow_path}/2021/yellow_tripdata_2021-08.parquet")
yellow_tripdata_2021_09 = ProcessYellowFile(spark,f"{yellow_path}/2021/yellow_tripdata_2021-09.parquet")
yellow_tripdata_2021_10 = ProcessYellowFile(spark,f"{yellow_path}/2021/yellow_tripdata_2021-10.parquet")
yellow_tripdata_2021_11 = ProcessYellowFile(spark,f"{yellow_path}/2021/yellow_tripdata_2021-11.parquet")
yellow_tripdata_2021_12 = ProcessYellowFile(spark,f"{yellow_path}/2021/yellow_tripdata_2021-12.parquet")
# 2022
yellow_tripdata_2022_01 = ProcessYellowFile(spark,f"{yellow_path}/2022/yellow_tripdata_2022-01.parquet")
yellow_tripdata_2022_02 = ProcessYellowFile(spark,f"{yellow_path}/2022/yellow_tripdata_2022-02.parquet")
yellow_tripdata_2022_03 = ProcessYellowFile(spark,f"{yellow_path}/2022/yellow_tripdata_2022-03.parquet")
yellow_tripdata_2022_04 = ProcessYellowFile(spark,f"{yellow_path}/2022/yellow_tripdata_2022-04.parquet")
yellow_tripdata_2022_05 = ProcessYellowFile(spark,f"{yellow_path}/2022/yellow_tripdata_2022-05.parquet")
yellow_tripdata_2022_06 = ProcessYellowFile(spark,f"{yellow_path}/2022/yellow_tripdata_2022-06.parquet")
yellow_tripdata_2022_07 = ProcessYellowFile(spark,f"{yellow_path}/2022/yellow_tripdata_2022-07.parquet")
yellow_tripdata_2022_08 = ProcessYellowFile(spark,f"{yellow_path}/2022/yellow_tripdata_2022-08.parquet")
yellow_tripdata_2022_09 = ProcessYellowFile(spark,f"{yellow_path}/2022/yellow_tripdata_2022-09.parquet")
yellow_tripdata_2022_10 = ProcessYellowFile(spark,f"{yellow_path}/2022/yellow_tripdata_2022-10.parquet")
yellow_tripdata_2022_11 = ProcessYellowFile(spark,f"{yellow_path}/2022/yellow_tripdata_2022-11.parquet")
yellow_tripdata_2022_12 = ProcessYellowFile(spark,f"{yellow_path}/2022/yellow_tripdata_2022-12.parquet")
# 2023
yellow_tripdata_2023_01 = ProcessYellowFile(spark,f"{yellow_path}/2023/yellow_tripdata_2023-01.parquet")
yellow_tripdata_2023_02 = ProcessYellowFile(spark,f"{yellow_path}/2023/yellow_tripdata_2023-02.parquet")
yellow_tripdata_2023_03 = ProcessYellowFile(spark,f"{yellow_path}/2023/yellow_tripdata_2023-03.parquet")
yellow_tripdata_2023_04 = ProcessYellowFile(spark,f"{yellow_path}/2023/yellow_tripdata_2023-04.parquet")
yellow_tripdata_2023_05 = ProcessYellowFile(spark,f"{yellow_path}/2023/yellow_tripdata_2023-05.parquet")
yellow_tripdata_2023_06 = ProcessYellowFile(spark,f"{yellow_path}/2023/yellow_tripdata_2023-06.parquet")
yellow_tripdata_2023_07 = ProcessYellowFile(spark,f"{yellow_path}/2023/yellow_tripdata_2023-07.parquet")
yellow_tripdata_2023_08 = ProcessYellowFile(spark,f"{yellow_path}/2023/yellow_tripdata_2023-08.parquet")
yellow_tripdata_2023_09 = ProcessYellowFile(spark,f"{yellow_path}/2023/yellow_tripdata_2023-09.parquet")
yellow_tripdata_2023_10 = ProcessYellowFile(spark,f"{yellow_path}/2023/yellow_tripdata_2023-10.parquet")
yellow_tripdata_2023_11 = ProcessYellowFile(spark,f"{yellow_path}/2023/yellow_tripdata_2023-11.parquet")
yellow_tripdata_2023_12 = ProcessYellowFile(spark,f"{yellow_path}/2023/yellow_tripdata_2023-12.parquet")
# 2024
yellow_tripdata_2024_01 = ProcessYellowFile(spark,f"{yellow_path}/2024/yellow_tripdata_2024-01.parquet")
yellow_tripdata_2024_02 = ProcessYellowFile(spark,f"{yellow_path}/2024/yellow_tripdata_2024-02.parquet")
# Merge everything together (we should have all the same schema now)
df_yellow_final = yellow_tripdata_2019_01 \
.union(yellow_tripdata_2019_02) \
.union(yellow_tripdata_2019_03) \
.union(yellow_tripdata_2019_04) \
.union(yellow_tripdata_2019_05) \
.union(yellow_tripdata_2019_06) \
.union(yellow_tripdata_2019_07) \
.union(yellow_tripdata_2019_08) \
.union(yellow_tripdata_2019_09) \
.union(yellow_tripdata_2019_10) \
.union(yellow_tripdata_2019_11) \
.union(yellow_tripdata_2019_12) \
.union(yellow_tripdata_2020_01) \
.union(yellow_tripdata_2020_02) \
.union(yellow_tripdata_2020_03) \
.union(yellow_tripdata_2020_04) \
.union(yellow_tripdata_2020_05) \
.union(yellow_tripdata_2020_06) \
.union(yellow_tripdata_2020_07) \
.union(yellow_tripdata_2020_08) \
.union(yellow_tripdata_2020_09) \
.union(yellow_tripdata_2020_10) \
.union(yellow_tripdata_2020_11) \
.union(yellow_tripdata_2020_12) \
.union(yellow_tripdata_2021_01) \
.union(yellow_tripdata_2021_02) \
.union(yellow_tripdata_2021_03) \
.union(yellow_tripdata_2021_04) \
.union(yellow_tripdata_2021_05) \
.union(yellow_tripdata_2021_06) \
.union(yellow_tripdata_2021_07) \
.union(yellow_tripdata_2021_08) \
.union(yellow_tripdata_2021_09) \
.union(yellow_tripdata_2021_10) \
.union(yellow_tripdata_2021_11) \
.union(yellow_tripdata_2021_12) \
.union(yellow_tripdata_2022_01) \
.union(yellow_tripdata_2022_02) \
.union(yellow_tripdata_2022_03) \
.union(yellow_tripdata_2022_04) \
.union(yellow_tripdata_2022_05) \
.union(yellow_tripdata_2022_06) \
.union(yellow_tripdata_2022_07) \
.union(yellow_tripdata_2022_08) \
.union(yellow_tripdata_2022_09) \
.union(yellow_tripdata_2022_10) \
.union(yellow_tripdata_2022_11) \
.union(yellow_tripdata_2022_12) \
.union(yellow_tripdata_2023_01) \
.union(yellow_tripdata_2023_02) \
.union(yellow_tripdata_2023_03) \
.union(yellow_tripdata_2023_04) \
.union(yellow_tripdata_2023_05) \
.union(yellow_tripdata_2023_06) \
.union(yellow_tripdata_2023_07) \
.union(yellow_tripdata_2023_08) \
.union(yellow_tripdata_2023_09) \
.union(yellow_tripdata_2023_10) \
.union(yellow_tripdata_2023_11) \
.union(yellow_tripdata_2023_12) \
.union(yellow_tripdata_2024_01) \
.union(yellow_tripdata_2024_02)
df_with_partition_cols = df_yellow_final \
.withColumn("year", year (col("Pickup_DateTime"))) \
.withColumn("month", month (col("Pickup_DateTime"))) \
.filter("Pickup_DateTime >= '2019-01-01' AND Pickup_DateTime <= '2024-02-29'")
# Write as Parquet
df_with_partition_cols \
.repartition(5) \
.coalesce(5) \
.write \
.mode("overwrite") \
.partitionBy("year","month") \
.parquet(destination + "yellow/trips_table/parquet")
# Write as CSV
df_with_partition_cols \
.repartition(5) \
.coalesce(5) \
.write \
.mode("overwrite") \
.partitionBy("year","month") \
.format("csv") \
.option('header',True) \
.save(destination + "yellow/trips_table/csv")
# Write as JSON
df_with_partition_cols \
.repartition(5) \
.coalesce(5) \
.write \
.mode("overwrite") \
.partitionBy("year","month") \
.format("json") \
.save(destination + "yellow/trips_table/json")
################################################################################################
# Green
################################################################################################
green_path = sourceGreen.replace("/*/*.parquet","")
# The NYC taxi data has fields that change data types and some are incompatible with mergeScheme
# 2019
green_tripdata_2019_01 = ProcessGreenFile(spark,f"{green_path}/2019/green_tripdata_2019-01.parquet")
green_tripdata_2019_02 = ProcessGreenFile(spark,f"{green_path}/2019/green_tripdata_2019-02.parquet")
green_tripdata_2019_03 = ProcessGreenFile(spark,f"{green_path}/2019/green_tripdata_2019-03.parquet")
green_tripdata_2019_04 = ProcessGreenFile(spark,f"{green_path}/2019/green_tripdata_2019-04.parquet")
green_tripdata_2019_05 = ProcessGreenFile(spark,f"{green_path}/2019/green_tripdata_2019-05.parquet")
green_tripdata_2019_06 = ProcessGreenFile(spark,f"{green_path}/2019/green_tripdata_2019-06.parquet")
green_tripdata_2019_07 = ProcessGreenFile(spark,f"{green_path}/2019/green_tripdata_2019-07.parquet")
green_tripdata_2019_08 = ProcessGreenFile(spark,f"{green_path}/2019/green_tripdata_2019-08.parquet")
green_tripdata_2019_09 = ProcessGreenFile(spark,f"{green_path}/2019/green_tripdata_2019-09.parquet")
green_tripdata_2019_10 = ProcessGreenFile(spark,f"{green_path}/2019/green_tripdata_2019-10.parquet")
green_tripdata_2019_11 = ProcessGreenFile(spark,f"{green_path}/2019/green_tripdata_2019-11.parquet")
green_tripdata_2019_12 = ProcessGreenFile(spark,f"{green_path}/2019/green_tripdata_2019-12.parquet")
# 2020
green_tripdata_2020_01 = ProcessGreenFile(spark,f"{green_path}/2020/green_tripdata_2020-01.parquet")
green_tripdata_2020_02 = ProcessGreenFile(spark,f"{green_path}/2020/green_tripdata_2020-02.parquet")
green_tripdata_2020_03 = ProcessGreenFile(spark,f"{green_path}/2020/green_tripdata_2020-03.parquet")
green_tripdata_2020_04 = ProcessGreenFile(spark,f"{green_path}/2020/green_tripdata_2020-04.parquet")
green_tripdata_2020_05 = ProcessGreenFile(spark,f"{green_path}/2020/green_tripdata_2020-05.parquet")
green_tripdata_2020_06 = ProcessGreenFile(spark,f"{green_path}/2020/green_tripdata_2020-06.parquet")
green_tripdata_2020_07 = ProcessGreenFile(spark,f"{green_path}/2020/green_tripdata_2020-07.parquet")
green_tripdata_2020_08 = ProcessGreenFile(spark,f"{green_path}/2020/green_tripdata_2020-08.parquet")
green_tripdata_2020_09 = ProcessGreenFile(spark,f"{green_path}/2020/green_tripdata_2020-09.parquet")
green_tripdata_2020_10 = ProcessGreenFile(spark,f"{green_path}/2020/green_tripdata_2020-10.parquet")
green_tripdata_2020_11 = ProcessGreenFile(spark,f"{green_path}/2020/green_tripdata_2020-11.parquet")
green_tripdata_2020_12 = ProcessGreenFile(spark,f"{green_path}/2020/green_tripdata_2020-12.parquet")
# 2021
green_tripdata_2021_01 = ProcessGreenFile(spark,f"{green_path}/2021/green_tripdata_2021-01.parquet")
green_tripdata_2021_02 = ProcessGreenFile(spark,f"{green_path}/2021/green_tripdata_2021-02.parquet")
green_tripdata_2021_03 = ProcessGreenFile(spark,f"{green_path}/2021/green_tripdata_2021-03.parquet")
green_tripdata_2021_04 = ProcessGreenFile(spark,f"{green_path}/2021/green_tripdata_2021-04.parquet")
green_tripdata_2021_05 = ProcessGreenFile(spark,f"{green_path}/2021/green_tripdata_2021-05.parquet")
green_tripdata_2021_06 = ProcessGreenFile(spark,f"{green_path}/2021/green_tripdata_2021-06.parquet")
green_tripdata_2021_07 = ProcessGreenFile(spark,f"{green_path}/2021/green_tripdata_2021-07.parquet")
green_tripdata_2021_08 = ProcessGreenFile(spark,f"{green_path}/2021/green_tripdata_2021-08.parquet")
green_tripdata_2021_09 = ProcessGreenFile(spark,f"{green_path}/2021/green_tripdata_2021-09.parquet")
green_tripdata_2021_10 = ProcessGreenFile(spark,f"{green_path}/2021/green_tripdata_2021-10.parquet")
green_tripdata_2021_11 = ProcessGreenFile(spark,f"{green_path}/2021/green_tripdata_2021-11.parquet")
green_tripdata_2021_12 = ProcessGreenFile(spark,f"{green_path}/2021/green_tripdata_2021-12.parquet")
# 2022
green_tripdata_2022_01 = ProcessGreenFile(spark,f"{green_path}/2022/green_tripdata_2022-01.parquet")
green_tripdata_2022_02 = ProcessGreenFile(spark,f"{green_path}/2022/green_tripdata_2022-02.parquet")
green_tripdata_2022_03 = ProcessGreenFile(spark,f"{green_path}/2022/green_tripdata_2022-03.parquet")
green_tripdata_2022_04 = ProcessGreenFile(spark,f"{green_path}/2022/green_tripdata_2022-04.parquet")
green_tripdata_2022_05 = ProcessGreenFile(spark,f"{green_path}/2022/green_tripdata_2022-05.parquet")
green_tripdata_2022_06 = ProcessGreenFile(spark,f"{green_path}/2022/green_tripdata_2022-06.parquet")
green_tripdata_2022_07 = ProcessGreenFile(spark,f"{green_path}/2022/green_tripdata_2022-07.parquet")
green_tripdata_2022_08 = ProcessGreenFile(spark,f"{green_path}/2022/green_tripdata_2022-08.parquet")
green_tripdata_2022_09 = ProcessGreenFile(spark,f"{green_path}/2022/green_tripdata_2022-09.parquet")
green_tripdata_2022_10 = ProcessGreenFile(spark,f"{green_path}/2022/green_tripdata_2022-10.parquet")
green_tripdata_2022_11 = ProcessGreenFile(spark,f"{green_path}/2022/green_tripdata_2022-11.parquet")
green_tripdata_2022_12 = ProcessGreenFile(spark,f"{green_path}/2022/green_tripdata_2022-12.parquet")
# 2023
green_tripdata_2023_01 = ProcessGreenFile(spark,f"{green_path}/2023/green_tripdata_2023-01.parquet")
green_tripdata_2023_02 = ProcessGreenFile(spark,f"{green_path}/2023/green_tripdata_2023-02.parquet")
green_tripdata_2023_03 = ProcessGreenFile(spark,f"{green_path}/2023/green_tripdata_2023-03.parquet")
green_tripdata_2023_04 = ProcessGreenFile(spark,f"{green_path}/2023/green_tripdata_2023-04.parquet")
green_tripdata_2023_05 = ProcessGreenFile(spark,f"{green_path}/2023/green_tripdata_2023-05.parquet")
green_tripdata_2023_06 = ProcessGreenFile(spark,f"{green_path}/2023/green_tripdata_2023-06.parquet")
green_tripdata_2023_07 = ProcessGreenFile(spark,f"{green_path}/2023/green_tripdata_2023-07.parquet")
green_tripdata_2023_08 = ProcessGreenFile(spark,f"{green_path}/2023/green_tripdata_2023-08.parquet")
green_tripdata_2023_09 = ProcessGreenFile(spark,f"{green_path}/2023/green_tripdata_2023-09.parquet")
green_tripdata_2023_10 = ProcessGreenFile(spark,f"{green_path}/2023/green_tripdata_2023-10.parquet")
green_tripdata_2023_11 = ProcessGreenFile(spark,f"{green_path}/2023/green_tripdata_2023-11.parquet")
green_tripdata_2023_12 = ProcessGreenFile(spark,f"{green_path}/2023/green_tripdata_2023-12.parquet")
# 2024
green_tripdata_2024_01 = ProcessGreenFile(spark,f"{green_path}/2024/green_tripdata_2024-01.parquet")
green_tripdata_2024_02 = ProcessGreenFile(spark,f"{green_path}/2024/green_tripdata_2024-02.parquet")
# Merge everything together (we should have all the same schema now)
df_green_final = green_tripdata_2019_01 \
.union(green_tripdata_2019_02) \
.union(green_tripdata_2019_03) \
.union(green_tripdata_2019_04) \
.union(green_tripdata_2019_05) \
.union(green_tripdata_2019_06) \
.union(green_tripdata_2019_07) \
.union(green_tripdata_2019_08) \
.union(green_tripdata_2019_09) \
.union(green_tripdata_2019_10) \
.union(green_tripdata_2019_11) \
.union(green_tripdata_2019_12) \
.union(green_tripdata_2020_01) \
.union(green_tripdata_2020_02) \
.union(green_tripdata_2020_03) \
.union(green_tripdata_2020_04) \
.union(green_tripdata_2020_05) \
.union(green_tripdata_2020_06) \
.union(green_tripdata_2020_07) \
.union(green_tripdata_2020_08) \
.union(green_tripdata_2020_09) \
.union(green_tripdata_2020_10) \
.union(green_tripdata_2020_11) \
.union(green_tripdata_2020_12) \
.union(green_tripdata_2021_01) \
.union(green_tripdata_2021_02) \
.union(green_tripdata_2021_03) \
.union(green_tripdata_2021_04) \
.union(green_tripdata_2021_05) \
.union(green_tripdata_2021_06) \
.union(green_tripdata_2021_07) \
.union(green_tripdata_2021_08) \
.union(green_tripdata_2021_09) \
.union(green_tripdata_2021_10) \
.union(green_tripdata_2021_11) \
.union(green_tripdata_2021_12) \
.union(green_tripdata_2022_01) \
.union(green_tripdata_2022_02) \
.union(green_tripdata_2022_03) \
.union(green_tripdata_2022_04) \
.union(green_tripdata_2022_05) \
.union(green_tripdata_2022_06) \
.union(green_tripdata_2022_07) \
.union(green_tripdata_2022_08) \
.union(green_tripdata_2022_09) \
.union(green_tripdata_2022_10) \
.union(green_tripdata_2022_11) \
.union(green_tripdata_2022_12) \
.union(green_tripdata_2023_01) \
.union(green_tripdata_2023_02) \
.union(green_tripdata_2023_03) \
.union(green_tripdata_2023_04) \
.union(green_tripdata_2023_05) \
.union(green_tripdata_2023_06) \
.union(green_tripdata_2023_07) \
.union(green_tripdata_2023_08) \
.union(green_tripdata_2023_09) \
.union(green_tripdata_2023_10) \
.union(green_tripdata_2023_11) \
.union(green_tripdata_2023_12) \
.union(green_tripdata_2024_01) \
.union(green_tripdata_2024_02)
df_with_partition_cols = df_green_final \
.withColumn("year", year (col("Pickup_DateTime"))) \
.withColumn("month", month (col("Pickup_DateTime"))) \
.filter("Pickup_DateTime >= '2019-01-01' AND Pickup_DateTime <= '2024-02-29'")
# Write as Parquet
df_with_partition_cols \
.repartition(5) \
.coalesce(5) \
.write \
.mode("overwrite") \
.partitionBy("year","month") \
.parquet(destination + "green/trips_table/parquet")
# Write as CSV
df_with_partition_cols \
.repartition(5) \
.coalesce(5) \
.write \
.mode("overwrite") \
.partitionBy("year","month") \
.format("csv") \
.option('header',True) \
.save(destination + "green/trips_table/csv")
# Write as JSON
df_with_partition_cols \
.repartition(5) \
.coalesce(5) \
.write \
.mode("overwrite") \
.partitionBy("year","month") \
.format("json") \
.save(destination + "green/trips_table/json")
################################################################################################
# Create Common Tables
################################################################################################
# Vendor Table
vendor_df = spark.createDataFrame(
[
(1, "Creative Mobile Technologies"),
(2, "VeriFone")
],
["Vendor_Id", "Vendor_Description"]
)
vendor_df \
.repartition(1) \
.coalesce(1) \
.write \
.mode("overwrite") \
.parquet(destination + "vendor_table/")
# Rate Code Table
rate_code_df = spark.createDataFrame(
[
(1, "Standard rate"),
(2, "JFK"),
(3, "Newark"),
(4, "Nassau or Westchester"),
(5, "Negotiated fare"),
(6, "Group ride"),
],
["Rate_Code_Id", "Rate_Code_Description"]
)
rate_code_df \
.repartition(1) \
.coalesce(1) \
.write \
.mode("overwrite") \
.parquet(destination + "rate_code_table/")
# Payment Type Table
payment_type_df = spark.createDataFrame(
[
(1, "Credit card"),
(2, "Cash"),
(3, "No charge"),
(4, "Dispute"),
(5, "Unknown"),
(6, "Voided trip"),
],
["Payment_Type_Id", "Payment_Type_Description"]
)
payment_type_df \
.repartition(1) \
.coalesce(1) \
.write \
.mode("overwrite") \
.parquet(destination + "payment_type_table/")
# Trip Type Table
trip_type_df = spark.createDataFrame(
[
(1, "Street-hail"),
(2, "Dispatch")
],
["Trip_Type_Id", "Trip_Type_Description"]
)
trip_type_df \
.repartition(1) \
.coalesce(1) \
.write \
.mode("overwrite") \
.parquet(destination + "trip_type_table/")
spark.stop()