in dataproc/convert_taxi_to_parquet.py [0:0]
def ProcessGreenFile(spark, filenameAndPath):
df_source = spark.read.option("mergeSchema", "true").parquet(filenameAndPath)
print("ProcessGreenFile: filenameAndPath: ",filenameAndPath)
df_TypeCast = df_source \
.withColumn("Vendor_ID",col("VendorID").cast(IntegerType())) \
.withColumn("Pickup_DateTime",col("lpep_pickup_datetime").cast(TimestampType())) \
.withColumn("Dropoff_DateTime",col("lpep_dropoff_datetime").cast(TimestampType())) \
.withColumn("Store_And_Forward",col("store_and_fwd_flag").cast(StringType())) \
.withColumn("Rate_Code_Id",col("RatecodeID").cast(IntegerType())) \
.withColumn("PULocationID",col("PULocationID").cast(IntegerType())) \
.withColumn("DOLocationID",col("DOLocationID").cast(IntegerType())) \
.withColumn("Passenger_Count",col("passenger_count").cast(IntegerType())) \
.withColumn("Trip_Distance",col("trip_distance").cast(DoubleType())) \
.withColumn("Fare_Amount",col("fare_amount").cast(DoubleType())) \
.withColumn("Surcharge",col("extra").cast(DoubleType())) \
.withColumn("MTA_Tax",col("mta_tax").cast(DoubleType())) \
.withColumn("Tip_Amount",col("tip_amount").cast(DoubleType())) \
.withColumn("Tolls_Amount",col("tolls_amount").cast(DoubleType())) \
.withColumn("Ehail_Fee",col("ehail_fee").cast(DoubleType())) \
.withColumn("Improvement_Surcharge",col("improvement_surcharge").cast(DoubleType())) \
.withColumn("Total_Amount",col("total_amount").cast(DoubleType())) \
.withColumn("Payment_Type_Id",col("payment_type").cast(IntegerType())) \
.withColumn("Trip_Type",col("trip_type").cast(IntegerType())) \
.withColumn("Congestion_Surcharge",col("congestion_surcharge").cast(DoubleType()))
df_result = df_TypeCast.select( \
'Vendor_Id', \
'Pickup_DateTime', \
'Dropoff_DateTime', \
'Store_And_Forward', \
'Rate_Code_Id', \
'PULocationID', \
'DOLocationID', \
'Passenger_Count', \
'Trip_Distance', \
'Fare_Amount', \
'Surcharge', \
'MTA_Tax', \
'Tip_Amount', \
'Tolls_Amount', \
'Ehail_Fee', \
'Improvement_Surcharge', \
'Total_Amount', \
'Payment_Type_Id', \
'Trip_Type', \
'Congestion_Surcharge' \
)
return df_result