dataproc/convert_taxi_to_parquet.py (482 lines of code) (raw):
####################################################################################
# Copyright 2022 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
####################################################################################
# Author: Adam Paternostro
# Summary: Opens the taxi parquet files and renames the fields as well as partitions the data.
# Note: NYC changes from CSV to Parquet format, this file has been updated on 05/16/2022
from pyspark.sql.dataframe import DataFrame
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, year, month
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DoubleType, TimestampType
from datetime import datetime
import time
import sys
# All the data cannot be read at once since MergeSchema does not work for the NYC data
def ProcessGreenFile(spark, filenameAndPath):
df_source = spark.read.option("mergeSchema", "true").parquet(filenameAndPath)
print("ProcessGreenFile: filenameAndPath: ",filenameAndPath)
df_TypeCast = df_source \
.withColumn("Vendor_ID",col("VendorID").cast(IntegerType())) \
.withColumn("Pickup_DateTime",col("lpep_pickup_datetime").cast(TimestampType())) \
.withColumn("Dropoff_DateTime",col("lpep_dropoff_datetime").cast(TimestampType())) \
.withColumn("Store_And_Forward",col("store_and_fwd_flag").cast(StringType())) \
.withColumn("Rate_Code_Id",col("RatecodeID").cast(IntegerType())) \
.withColumn("PULocationID",col("PULocationID").cast(IntegerType())) \
.withColumn("DOLocationID",col("DOLocationID").cast(IntegerType())) \
.withColumn("Passenger_Count",col("passenger_count").cast(IntegerType())) \
.withColumn("Trip_Distance",col("trip_distance").cast(DoubleType())) \
.withColumn("Fare_Amount",col("fare_amount").cast(DoubleType())) \
.withColumn("Surcharge",col("extra").cast(DoubleType())) \
.withColumn("MTA_Tax",col("mta_tax").cast(DoubleType())) \
.withColumn("Tip_Amount",col("tip_amount").cast(DoubleType())) \
.withColumn("Tolls_Amount",col("tolls_amount").cast(DoubleType())) \
.withColumn("Ehail_Fee",col("ehail_fee").cast(DoubleType())) \
.withColumn("Improvement_Surcharge",col("improvement_surcharge").cast(DoubleType())) \
.withColumn("Total_Amount",col("total_amount").cast(DoubleType())) \
.withColumn("Payment_Type_Id",col("payment_type").cast(IntegerType())) \
.withColumn("Trip_Type",col("trip_type").cast(IntegerType())) \
.withColumn("Congestion_Surcharge",col("congestion_surcharge").cast(DoubleType()))
df_result = df_TypeCast.select( \
'Vendor_Id', \
'Pickup_DateTime', \
'Dropoff_DateTime', \
'Store_And_Forward', \
'Rate_Code_Id', \
'PULocationID', \
'DOLocationID', \
'Passenger_Count', \
'Trip_Distance', \
'Fare_Amount', \
'Surcharge', \
'MTA_Tax', \
'Tip_Amount', \
'Tolls_Amount', \
'Ehail_Fee', \
'Improvement_Surcharge', \
'Total_Amount', \
'Payment_Type_Id', \
'Trip_Type', \
'Congestion_Surcharge' \
)
return df_result
# All the data cannot be read at once since MergeSchema does not work for the NYC data
def ProcessYellowFile(spark, filenameAndPath):
df_source = spark.read.parquet(filenameAndPath)
print("ProcessYellowFile: filenameAndPath: ",filenameAndPath)
df_TypeCast = df_source \
.withColumn("Vendor_ID",col("VendorID").cast(IntegerType())) \
.withColumn("Pickup_DateTime",col("tpep_pickup_datetime").cast(TimestampType())) \
.withColumn("Dropoff_DateTime",col("tpep_dropoff_datetime").cast(TimestampType())) \
.withColumn("Passenger_Count",col("passenger_count").cast(IntegerType())) \
.withColumn("Trip_Distance",col("trip_distance").cast(DoubleType())) \
.withColumn("Rate_Code_Id",col("RatecodeID").cast(IntegerType())) \
.withColumn("Store_And_Forward",col("store_and_fwd_flag").cast(StringType())) \
.withColumn("PULocationID",col("PULocationID").cast(IntegerType())) \
.withColumn("DOLocationID",col("DOLocationID").cast(IntegerType())) \
.withColumn("Payment_Type_Id",col("payment_type").cast(IntegerType())) \
.withColumn("Fare_Amount",col("fare_amount").cast(DoubleType())) \
.withColumn("Surcharge",col("extra").cast(DoubleType())) \
.withColumn("MTA_Tax",col("mta_tax").cast(DoubleType())) \
.withColumn("Tip_Amount",col("tip_amount").cast(DoubleType())) \
.withColumn("Tolls_Amount",col("tolls_amount").cast(DoubleType())) \
.withColumn("Improvement_Surcharge",col("improvement_surcharge").cast(DoubleType())) \
.withColumn("Total_Amount",col("total_amount").cast(DoubleType())) \
.withColumn("Congestion_Surcharge",col("congestion_surcharge").cast(DoubleType()))
#.withColumn("Airport_Fee",col("airport_fee").cast(DoubleType()))
df_result = df_TypeCast.select( \
'Vendor_Id', \
'Pickup_DateTime', \
'Dropoff_DateTime', \
'Passenger_Count', \
'Trip_Distance', \
'Rate_Code_Id', \
'Store_And_Forward', \
'PULocationID', \
'DOLocationID', \
'Payment_Type_Id', \
'Fare_Amount', \
'Surcharge', \
'MTA_Tax', \
'Tip_Amount', \
'Tolls_Amount', \
'Improvement_Surcharge', \
'Total_Amount', \
'Congestion_Surcharge' \
)
return df_result
def ConvertTaxiData(sourceYellow, sourceGreen, destination):
print("ConvertTaxiData: sourceYellow: ",sourceYellow)
print("ConvertTaxiData: sourceGreen: ",sourceGreen)
print("ConvertTaxiData: destination: ",destination)
spark = SparkSession \
.builder \
.appName("ConvertTaxiData") \
.getOrCreate()
################################################################################################
# Yellow
################################################################################################
yellow_path = sourceYellow.replace("/*/*.parquet","")
# The NYC taxi data has fields that change data types and some are incompatible with mergeScheme
# 2019
yellow_tripdata_2019_01 = ProcessYellowFile(spark,f"{yellow_path}/2019/yellow_tripdata_2019-01.parquet")
yellow_tripdata_2019_02 = ProcessYellowFile(spark,f"{yellow_path}/2019/yellow_tripdata_2019-02.parquet")
yellow_tripdata_2019_03 = ProcessYellowFile(spark,f"{yellow_path}/2019/yellow_tripdata_2019-03.parquet")
yellow_tripdata_2019_04 = ProcessYellowFile(spark,f"{yellow_path}/2019/yellow_tripdata_2019-04.parquet")
yellow_tripdata_2019_05 = ProcessYellowFile(spark,f"{yellow_path}/2019/yellow_tripdata_2019-05.parquet")
yellow_tripdata_2019_06 = ProcessYellowFile(spark,f"{yellow_path}/2019/yellow_tripdata_2019-06.parquet")
yellow_tripdata_2019_07 = ProcessYellowFile(spark,f"{yellow_path}/2019/yellow_tripdata_2019-07.parquet")
yellow_tripdata_2019_08 = ProcessYellowFile(spark,f"{yellow_path}/2019/yellow_tripdata_2019-08.parquet")
yellow_tripdata_2019_09 = ProcessYellowFile(spark,f"{yellow_path}/2019/yellow_tripdata_2019-09.parquet")
yellow_tripdata_2019_10 = ProcessYellowFile(spark,f"{yellow_path}/2019/yellow_tripdata_2019-10.parquet")
yellow_tripdata_2019_11 = ProcessYellowFile(spark,f"{yellow_path}/2019/yellow_tripdata_2019-11.parquet")
yellow_tripdata_2019_12 = ProcessYellowFile(spark,f"{yellow_path}/2019/yellow_tripdata_2019-12.parquet")
# 2020
yellow_tripdata_2020_01 = ProcessYellowFile(spark,f"{yellow_path}/2020/yellow_tripdata_2020-01.parquet")
yellow_tripdata_2020_02 = ProcessYellowFile(spark,f"{yellow_path}/2020/yellow_tripdata_2020-02.parquet")
yellow_tripdata_2020_03 = ProcessYellowFile(spark,f"{yellow_path}/2020/yellow_tripdata_2020-03.parquet")
yellow_tripdata_2020_04 = ProcessYellowFile(spark,f"{yellow_path}/2020/yellow_tripdata_2020-04.parquet")
yellow_tripdata_2020_05 = ProcessYellowFile(spark,f"{yellow_path}/2020/yellow_tripdata_2020-05.parquet")
yellow_tripdata_2020_06 = ProcessYellowFile(spark,f"{yellow_path}/2020/yellow_tripdata_2020-06.parquet")
yellow_tripdata_2020_07 = ProcessYellowFile(spark,f"{yellow_path}/2020/yellow_tripdata_2020-07.parquet")
yellow_tripdata_2020_08 = ProcessYellowFile(spark,f"{yellow_path}/2020/yellow_tripdata_2020-08.parquet")
yellow_tripdata_2020_09 = ProcessYellowFile(spark,f"{yellow_path}/2020/yellow_tripdata_2020-09.parquet")
yellow_tripdata_2020_10 = ProcessYellowFile(spark,f"{yellow_path}/2020/yellow_tripdata_2020-10.parquet")
yellow_tripdata_2020_11 = ProcessYellowFile(spark,f"{yellow_path}/2020/yellow_tripdata_2020-11.parquet")
yellow_tripdata_2020_12 = ProcessYellowFile(spark,f"{yellow_path}/2020/yellow_tripdata_2020-12.parquet")
# 2021
yellow_tripdata_2021_01 = ProcessYellowFile(spark,f"{yellow_path}/2021/yellow_tripdata_2021-01.parquet")
yellow_tripdata_2021_02 = ProcessYellowFile(spark,f"{yellow_path}/2021/yellow_tripdata_2021-02.parquet")
yellow_tripdata_2021_03 = ProcessYellowFile(spark,f"{yellow_path}/2021/yellow_tripdata_2021-03.parquet")
yellow_tripdata_2021_04 = ProcessYellowFile(spark,f"{yellow_path}/2021/yellow_tripdata_2021-04.parquet")
yellow_tripdata_2021_05 = ProcessYellowFile(spark,f"{yellow_path}/2021/yellow_tripdata_2021-05.parquet")
yellow_tripdata_2021_06 = ProcessYellowFile(spark,f"{yellow_path}/2021/yellow_tripdata_2021-06.parquet")
yellow_tripdata_2021_07 = ProcessYellowFile(spark,f"{yellow_path}/2021/yellow_tripdata_2021-07.parquet")
yellow_tripdata_2021_08 = ProcessYellowFile(spark,f"{yellow_path}/2021/yellow_tripdata_2021-08.parquet")
yellow_tripdata_2021_09 = ProcessYellowFile(spark,f"{yellow_path}/2021/yellow_tripdata_2021-09.parquet")
yellow_tripdata_2021_10 = ProcessYellowFile(spark,f"{yellow_path}/2021/yellow_tripdata_2021-10.parquet")
yellow_tripdata_2021_11 = ProcessYellowFile(spark,f"{yellow_path}/2021/yellow_tripdata_2021-11.parquet")
yellow_tripdata_2021_12 = ProcessYellowFile(spark,f"{yellow_path}/2021/yellow_tripdata_2021-12.parquet")
# 2022
yellow_tripdata_2022_01 = ProcessYellowFile(spark,f"{yellow_path}/2022/yellow_tripdata_2022-01.parquet")
yellow_tripdata_2022_02 = ProcessYellowFile(spark,f"{yellow_path}/2022/yellow_tripdata_2022-02.parquet")
yellow_tripdata_2022_03 = ProcessYellowFile(spark,f"{yellow_path}/2022/yellow_tripdata_2022-03.parquet")
yellow_tripdata_2022_04 = ProcessYellowFile(spark,f"{yellow_path}/2022/yellow_tripdata_2022-04.parquet")
yellow_tripdata_2022_05 = ProcessYellowFile(spark,f"{yellow_path}/2022/yellow_tripdata_2022-05.parquet")
yellow_tripdata_2022_06 = ProcessYellowFile(spark,f"{yellow_path}/2022/yellow_tripdata_2022-06.parquet")
yellow_tripdata_2022_07 = ProcessYellowFile(spark,f"{yellow_path}/2022/yellow_tripdata_2022-07.parquet")
yellow_tripdata_2022_08 = ProcessYellowFile(spark,f"{yellow_path}/2022/yellow_tripdata_2022-08.parquet")
yellow_tripdata_2022_09 = ProcessYellowFile(spark,f"{yellow_path}/2022/yellow_tripdata_2022-09.parquet")
yellow_tripdata_2022_10 = ProcessYellowFile(spark,f"{yellow_path}/2022/yellow_tripdata_2022-10.parquet")
yellow_tripdata_2022_11 = ProcessYellowFile(spark,f"{yellow_path}/2022/yellow_tripdata_2022-11.parquet")
yellow_tripdata_2022_12 = ProcessYellowFile(spark,f"{yellow_path}/2022/yellow_tripdata_2022-12.parquet")
# 2023
yellow_tripdata_2023_01 = ProcessYellowFile(spark,f"{yellow_path}/2023/yellow_tripdata_2023-01.parquet")
yellow_tripdata_2023_02 = ProcessYellowFile(spark,f"{yellow_path}/2023/yellow_tripdata_2023-02.parquet")
yellow_tripdata_2023_03 = ProcessYellowFile(spark,f"{yellow_path}/2023/yellow_tripdata_2023-03.parquet")
yellow_tripdata_2023_04 = ProcessYellowFile(spark,f"{yellow_path}/2023/yellow_tripdata_2023-04.parquet")
yellow_tripdata_2023_05 = ProcessYellowFile(spark,f"{yellow_path}/2023/yellow_tripdata_2023-05.parquet")
yellow_tripdata_2023_06 = ProcessYellowFile(spark,f"{yellow_path}/2023/yellow_tripdata_2023-06.parquet")
yellow_tripdata_2023_07 = ProcessYellowFile(spark,f"{yellow_path}/2023/yellow_tripdata_2023-07.parquet")
yellow_tripdata_2023_08 = ProcessYellowFile(spark,f"{yellow_path}/2023/yellow_tripdata_2023-08.parquet")
yellow_tripdata_2023_09 = ProcessYellowFile(spark,f"{yellow_path}/2023/yellow_tripdata_2023-09.parquet")
yellow_tripdata_2023_10 = ProcessYellowFile(spark,f"{yellow_path}/2023/yellow_tripdata_2023-10.parquet")
yellow_tripdata_2023_11 = ProcessYellowFile(spark,f"{yellow_path}/2023/yellow_tripdata_2023-11.parquet")
yellow_tripdata_2023_12 = ProcessYellowFile(spark,f"{yellow_path}/2023/yellow_tripdata_2023-12.parquet")
# 2024
yellow_tripdata_2024_01 = ProcessYellowFile(spark,f"{yellow_path}/2024/yellow_tripdata_2024-01.parquet")
yellow_tripdata_2024_02 = ProcessYellowFile(spark,f"{yellow_path}/2024/yellow_tripdata_2024-02.parquet")
# Merge everything together (we should have all the same schema now)
df_yellow_final = yellow_tripdata_2019_01 \
.union(yellow_tripdata_2019_02) \
.union(yellow_tripdata_2019_03) \
.union(yellow_tripdata_2019_04) \
.union(yellow_tripdata_2019_05) \
.union(yellow_tripdata_2019_06) \
.union(yellow_tripdata_2019_07) \
.union(yellow_tripdata_2019_08) \
.union(yellow_tripdata_2019_09) \
.union(yellow_tripdata_2019_10) \
.union(yellow_tripdata_2019_11) \
.union(yellow_tripdata_2019_12) \
.union(yellow_tripdata_2020_01) \
.union(yellow_tripdata_2020_02) \
.union(yellow_tripdata_2020_03) \
.union(yellow_tripdata_2020_04) \
.union(yellow_tripdata_2020_05) \
.union(yellow_tripdata_2020_06) \
.union(yellow_tripdata_2020_07) \
.union(yellow_tripdata_2020_08) \
.union(yellow_tripdata_2020_09) \
.union(yellow_tripdata_2020_10) \
.union(yellow_tripdata_2020_11) \
.union(yellow_tripdata_2020_12) \
.union(yellow_tripdata_2021_01) \
.union(yellow_tripdata_2021_02) \
.union(yellow_tripdata_2021_03) \
.union(yellow_tripdata_2021_04) \
.union(yellow_tripdata_2021_05) \
.union(yellow_tripdata_2021_06) \
.union(yellow_tripdata_2021_07) \
.union(yellow_tripdata_2021_08) \
.union(yellow_tripdata_2021_09) \
.union(yellow_tripdata_2021_10) \
.union(yellow_tripdata_2021_11) \
.union(yellow_tripdata_2021_12) \
.union(yellow_tripdata_2022_01) \
.union(yellow_tripdata_2022_02) \
.union(yellow_tripdata_2022_03) \
.union(yellow_tripdata_2022_04) \
.union(yellow_tripdata_2022_05) \
.union(yellow_tripdata_2022_06) \
.union(yellow_tripdata_2022_07) \
.union(yellow_tripdata_2022_08) \
.union(yellow_tripdata_2022_09) \
.union(yellow_tripdata_2022_10) \
.union(yellow_tripdata_2022_11) \
.union(yellow_tripdata_2022_12) \
.union(yellow_tripdata_2023_01) \
.union(yellow_tripdata_2023_02) \
.union(yellow_tripdata_2023_03) \
.union(yellow_tripdata_2023_04) \
.union(yellow_tripdata_2023_05) \
.union(yellow_tripdata_2023_06) \
.union(yellow_tripdata_2023_07) \
.union(yellow_tripdata_2023_08) \
.union(yellow_tripdata_2023_09) \
.union(yellow_tripdata_2023_10) \
.union(yellow_tripdata_2023_11) \
.union(yellow_tripdata_2023_12) \
.union(yellow_tripdata_2024_01) \
.union(yellow_tripdata_2024_02)
df_with_partition_cols = df_yellow_final \
.withColumn("year", year (col("Pickup_DateTime"))) \
.withColumn("month", month (col("Pickup_DateTime"))) \
.filter("Pickup_DateTime >= '2019-01-01' AND Pickup_DateTime <= '2024-02-29'")
# Write as Parquet
df_with_partition_cols \
.repartition(5) \
.coalesce(5) \
.write \
.mode("overwrite") \
.partitionBy("year","month") \
.parquet(destination + "yellow/trips_table/parquet")
# Write as CSV
df_with_partition_cols \
.repartition(5) \
.coalesce(5) \
.write \
.mode("overwrite") \
.partitionBy("year","month") \
.format("csv") \
.option('header',True) \
.save(destination + "yellow/trips_table/csv")
# Write as JSON
df_with_partition_cols \
.repartition(5) \
.coalesce(5) \
.write \
.mode("overwrite") \
.partitionBy("year","month") \
.format("json") \
.save(destination + "yellow/trips_table/json")
################################################################################################
# Green
################################################################################################
green_path = sourceGreen.replace("/*/*.parquet","")
# The NYC taxi data has fields that change data types and some are incompatible with mergeScheme
# 2019
green_tripdata_2019_01 = ProcessGreenFile(spark,f"{green_path}/2019/green_tripdata_2019-01.parquet")
green_tripdata_2019_02 = ProcessGreenFile(spark,f"{green_path}/2019/green_tripdata_2019-02.parquet")
green_tripdata_2019_03 = ProcessGreenFile(spark,f"{green_path}/2019/green_tripdata_2019-03.parquet")
green_tripdata_2019_04 = ProcessGreenFile(spark,f"{green_path}/2019/green_tripdata_2019-04.parquet")
green_tripdata_2019_05 = ProcessGreenFile(spark,f"{green_path}/2019/green_tripdata_2019-05.parquet")
green_tripdata_2019_06 = ProcessGreenFile(spark,f"{green_path}/2019/green_tripdata_2019-06.parquet")
green_tripdata_2019_07 = ProcessGreenFile(spark,f"{green_path}/2019/green_tripdata_2019-07.parquet")
green_tripdata_2019_08 = ProcessGreenFile(spark,f"{green_path}/2019/green_tripdata_2019-08.parquet")
green_tripdata_2019_09 = ProcessGreenFile(spark,f"{green_path}/2019/green_tripdata_2019-09.parquet")
green_tripdata_2019_10 = ProcessGreenFile(spark,f"{green_path}/2019/green_tripdata_2019-10.parquet")
green_tripdata_2019_11 = ProcessGreenFile(spark,f"{green_path}/2019/green_tripdata_2019-11.parquet")
green_tripdata_2019_12 = ProcessGreenFile(spark,f"{green_path}/2019/green_tripdata_2019-12.parquet")
# 2020
green_tripdata_2020_01 = ProcessGreenFile(spark,f"{green_path}/2020/green_tripdata_2020-01.parquet")
green_tripdata_2020_02 = ProcessGreenFile(spark,f"{green_path}/2020/green_tripdata_2020-02.parquet")
green_tripdata_2020_03 = ProcessGreenFile(spark,f"{green_path}/2020/green_tripdata_2020-03.parquet")
green_tripdata_2020_04 = ProcessGreenFile(spark,f"{green_path}/2020/green_tripdata_2020-04.parquet")
green_tripdata_2020_05 = ProcessGreenFile(spark,f"{green_path}/2020/green_tripdata_2020-05.parquet")
green_tripdata_2020_06 = ProcessGreenFile(spark,f"{green_path}/2020/green_tripdata_2020-06.parquet")
green_tripdata_2020_07 = ProcessGreenFile(spark,f"{green_path}/2020/green_tripdata_2020-07.parquet")
green_tripdata_2020_08 = ProcessGreenFile(spark,f"{green_path}/2020/green_tripdata_2020-08.parquet")
green_tripdata_2020_09 = ProcessGreenFile(spark,f"{green_path}/2020/green_tripdata_2020-09.parquet")
green_tripdata_2020_10 = ProcessGreenFile(spark,f"{green_path}/2020/green_tripdata_2020-10.parquet")
green_tripdata_2020_11 = ProcessGreenFile(spark,f"{green_path}/2020/green_tripdata_2020-11.parquet")
green_tripdata_2020_12 = ProcessGreenFile(spark,f"{green_path}/2020/green_tripdata_2020-12.parquet")
# 2021
green_tripdata_2021_01 = ProcessGreenFile(spark,f"{green_path}/2021/green_tripdata_2021-01.parquet")
green_tripdata_2021_02 = ProcessGreenFile(spark,f"{green_path}/2021/green_tripdata_2021-02.parquet")
green_tripdata_2021_03 = ProcessGreenFile(spark,f"{green_path}/2021/green_tripdata_2021-03.parquet")
green_tripdata_2021_04 = ProcessGreenFile(spark,f"{green_path}/2021/green_tripdata_2021-04.parquet")
green_tripdata_2021_05 = ProcessGreenFile(spark,f"{green_path}/2021/green_tripdata_2021-05.parquet")
green_tripdata_2021_06 = ProcessGreenFile(spark,f"{green_path}/2021/green_tripdata_2021-06.parquet")
green_tripdata_2021_07 = ProcessGreenFile(spark,f"{green_path}/2021/green_tripdata_2021-07.parquet")
green_tripdata_2021_08 = ProcessGreenFile(spark,f"{green_path}/2021/green_tripdata_2021-08.parquet")
green_tripdata_2021_09 = ProcessGreenFile(spark,f"{green_path}/2021/green_tripdata_2021-09.parquet")
green_tripdata_2021_10 = ProcessGreenFile(spark,f"{green_path}/2021/green_tripdata_2021-10.parquet")
green_tripdata_2021_11 = ProcessGreenFile(spark,f"{green_path}/2021/green_tripdata_2021-11.parquet")
green_tripdata_2021_12 = ProcessGreenFile(spark,f"{green_path}/2021/green_tripdata_2021-12.parquet")
# 2022
green_tripdata_2022_01 = ProcessGreenFile(spark,f"{green_path}/2022/green_tripdata_2022-01.parquet")
green_tripdata_2022_02 = ProcessGreenFile(spark,f"{green_path}/2022/green_tripdata_2022-02.parquet")
green_tripdata_2022_03 = ProcessGreenFile(spark,f"{green_path}/2022/green_tripdata_2022-03.parquet")
green_tripdata_2022_04 = ProcessGreenFile(spark,f"{green_path}/2022/green_tripdata_2022-04.parquet")
green_tripdata_2022_05 = ProcessGreenFile(spark,f"{green_path}/2022/green_tripdata_2022-05.parquet")
green_tripdata_2022_06 = ProcessGreenFile(spark,f"{green_path}/2022/green_tripdata_2022-06.parquet")
green_tripdata_2022_07 = ProcessGreenFile(spark,f"{green_path}/2022/green_tripdata_2022-07.parquet")
green_tripdata_2022_08 = ProcessGreenFile(spark,f"{green_path}/2022/green_tripdata_2022-08.parquet")
green_tripdata_2022_09 = ProcessGreenFile(spark,f"{green_path}/2022/green_tripdata_2022-09.parquet")
green_tripdata_2022_10 = ProcessGreenFile(spark,f"{green_path}/2022/green_tripdata_2022-10.parquet")
green_tripdata_2022_11 = ProcessGreenFile(spark,f"{green_path}/2022/green_tripdata_2022-11.parquet")
green_tripdata_2022_12 = ProcessGreenFile(spark,f"{green_path}/2022/green_tripdata_2022-12.parquet")
# 2023
green_tripdata_2023_01 = ProcessGreenFile(spark,f"{green_path}/2023/green_tripdata_2023-01.parquet")
green_tripdata_2023_02 = ProcessGreenFile(spark,f"{green_path}/2023/green_tripdata_2023-02.parquet")
green_tripdata_2023_03 = ProcessGreenFile(spark,f"{green_path}/2023/green_tripdata_2023-03.parquet")
green_tripdata_2023_04 = ProcessGreenFile(spark,f"{green_path}/2023/green_tripdata_2023-04.parquet")
green_tripdata_2023_05 = ProcessGreenFile(spark,f"{green_path}/2023/green_tripdata_2023-05.parquet")
green_tripdata_2023_06 = ProcessGreenFile(spark,f"{green_path}/2023/green_tripdata_2023-06.parquet")
green_tripdata_2023_07 = ProcessGreenFile(spark,f"{green_path}/2023/green_tripdata_2023-07.parquet")
green_tripdata_2023_08 = ProcessGreenFile(spark,f"{green_path}/2023/green_tripdata_2023-08.parquet")
green_tripdata_2023_09 = ProcessGreenFile(spark,f"{green_path}/2023/green_tripdata_2023-09.parquet")
green_tripdata_2023_10 = ProcessGreenFile(spark,f"{green_path}/2023/green_tripdata_2023-10.parquet")
green_tripdata_2023_11 = ProcessGreenFile(spark,f"{green_path}/2023/green_tripdata_2023-11.parquet")
green_tripdata_2023_12 = ProcessGreenFile(spark,f"{green_path}/2023/green_tripdata_2023-12.parquet")
# 2024
green_tripdata_2024_01 = ProcessGreenFile(spark,f"{green_path}/2024/green_tripdata_2024-01.parquet")
green_tripdata_2024_02 = ProcessGreenFile(spark,f"{green_path}/2024/green_tripdata_2024-02.parquet")
# Merge everything together (we should have all the same schema now)
df_green_final = green_tripdata_2019_01 \
.union(green_tripdata_2019_02) \
.union(green_tripdata_2019_03) \
.union(green_tripdata_2019_04) \
.union(green_tripdata_2019_05) \
.union(green_tripdata_2019_06) \
.union(green_tripdata_2019_07) \
.union(green_tripdata_2019_08) \
.union(green_tripdata_2019_09) \
.union(green_tripdata_2019_10) \
.union(green_tripdata_2019_11) \
.union(green_tripdata_2019_12) \
.union(green_tripdata_2020_01) \
.union(green_tripdata_2020_02) \
.union(green_tripdata_2020_03) \
.union(green_tripdata_2020_04) \
.union(green_tripdata_2020_05) \
.union(green_tripdata_2020_06) \
.union(green_tripdata_2020_07) \
.union(green_tripdata_2020_08) \
.union(green_tripdata_2020_09) \
.union(green_tripdata_2020_10) \
.union(green_tripdata_2020_11) \
.union(green_tripdata_2020_12) \
.union(green_tripdata_2021_01) \
.union(green_tripdata_2021_02) \
.union(green_tripdata_2021_03) \
.union(green_tripdata_2021_04) \
.union(green_tripdata_2021_05) \
.union(green_tripdata_2021_06) \
.union(green_tripdata_2021_07) \
.union(green_tripdata_2021_08) \
.union(green_tripdata_2021_09) \
.union(green_tripdata_2021_10) \
.union(green_tripdata_2021_11) \
.union(green_tripdata_2021_12) \
.union(green_tripdata_2022_01) \
.union(green_tripdata_2022_02) \
.union(green_tripdata_2022_03) \
.union(green_tripdata_2022_04) \
.union(green_tripdata_2022_05) \
.union(green_tripdata_2022_06) \
.union(green_tripdata_2022_07) \
.union(green_tripdata_2022_08) \
.union(green_tripdata_2022_09) \
.union(green_tripdata_2022_10) \
.union(green_tripdata_2022_11) \
.union(green_tripdata_2022_12) \
.union(green_tripdata_2023_01) \
.union(green_tripdata_2023_02) \
.union(green_tripdata_2023_03) \
.union(green_tripdata_2023_04) \
.union(green_tripdata_2023_05) \
.union(green_tripdata_2023_06) \
.union(green_tripdata_2023_07) \
.union(green_tripdata_2023_08) \
.union(green_tripdata_2023_09) \
.union(green_tripdata_2023_10) \
.union(green_tripdata_2023_11) \
.union(green_tripdata_2023_12) \
.union(green_tripdata_2024_01) \
.union(green_tripdata_2024_02)
df_with_partition_cols = df_green_final \
.withColumn("year", year (col("Pickup_DateTime"))) \
.withColumn("month", month (col("Pickup_DateTime"))) \
.filter("Pickup_DateTime >= '2019-01-01' AND Pickup_DateTime <= '2024-02-29'")
# Write as Parquet
df_with_partition_cols \
.repartition(5) \
.coalesce(5) \
.write \
.mode("overwrite") \
.partitionBy("year","month") \
.parquet(destination + "green/trips_table/parquet")
# Write as CSV
df_with_partition_cols \
.repartition(5) \
.coalesce(5) \
.write \
.mode("overwrite") \
.partitionBy("year","month") \
.format("csv") \
.option('header',True) \
.save(destination + "green/trips_table/csv")
# Write as JSON
df_with_partition_cols \
.repartition(5) \
.coalesce(5) \
.write \
.mode("overwrite") \
.partitionBy("year","month") \
.format("json") \
.save(destination + "green/trips_table/json")
################################################################################################
# Create Common Tables
################################################################################################
# Vendor Table
vendor_df = spark.createDataFrame(
[
(1, "Creative Mobile Technologies"),
(2, "VeriFone")
],
["Vendor_Id", "Vendor_Description"]
)
vendor_df \
.repartition(1) \
.coalesce(1) \
.write \
.mode("overwrite") \
.parquet(destination + "vendor_table/")
# Rate Code Table
rate_code_df = spark.createDataFrame(
[
(1, "Standard rate"),
(2, "JFK"),
(3, "Newark"),
(4, "Nassau or Westchester"),
(5, "Negotiated fare"),
(6, "Group ride"),
],
["Rate_Code_Id", "Rate_Code_Description"]
)
rate_code_df \
.repartition(1) \
.coalesce(1) \
.write \
.mode("overwrite") \
.parquet(destination + "rate_code_table/")
# Payment Type Table
payment_type_df = spark.createDataFrame(
[
(1, "Credit card"),
(2, "Cash"),
(3, "No charge"),
(4, "Dispute"),
(5, "Unknown"),
(6, "Voided trip"),
],
["Payment_Type_Id", "Payment_Type_Description"]
)
payment_type_df \
.repartition(1) \
.coalesce(1) \
.write \
.mode("overwrite") \
.parquet(destination + "payment_type_table/")
# Trip Type Table
trip_type_df = spark.createDataFrame(
[
(1, "Street-hail"),
(2, "Dispatch")
],
["Trip_Type_Id", "Trip_Type_Description"]
)
trip_type_df \
.repartition(1) \
.coalesce(1) \
.write \
.mode("overwrite") \
.parquet(destination + "trip_type_table/")
spark.stop()
# Main entry point
# convert_taxi_to_parquet gs://big-query-demo-09/test-taxi/yellow gs://big-query-demo-09/test-taxi/green gs://big-query-demo-09/test-taxi-output
if __name__ == "__main__":
if len(sys.argv) != 4:
print("Usage: convert_taxi_to_parquet sourceYellow sourceGreen destination")
sys.exit(-1)
sourceYellow = sys.argv[1]
sourceGreen = sys.argv[2]
destination = sys.argv[3]
print ("BEGIN: Main")
ConvertTaxiData(sourceYellow, sourceGreen, destination)
print ("END: Main")
# Sample run
# gcloud dataproc jobs submit pyspark \
# --cluster "testcluster" \
# --region="REPLACE-REGION" \
# gs://big-query-demo-09/pyspark-code/convert_taxi_to_parquet.py \
# -- gs://big-query-demo-09/test-taxi/yellow/*/*.parquet \
# gs://big-query-demo-09/test-taxi/green/*/*.parquet \
# gs://big-query-demo-09/test-taxi/dest/