def ProcessYellowFile()

in dataproc/convert_taxi_to_parquet.py [0:0]


def ProcessYellowFile(spark, filenameAndPath):
    df_source = spark.read.parquet(filenameAndPath)

    print("ProcessYellowFile: filenameAndPath: ",filenameAndPath)

    df_TypeCast = df_source \
        .withColumn("Vendor_ID",col("VendorID").cast(IntegerType())) \
        .withColumn("Pickup_DateTime",col("tpep_pickup_datetime").cast(TimestampType())) \
        .withColumn("Dropoff_DateTime",col("tpep_dropoff_datetime").cast(TimestampType())) \
        .withColumn("Passenger_Count",col("passenger_count").cast(IntegerType())) \
        .withColumn("Trip_Distance",col("trip_distance").cast(DoubleType())) \
        .withColumn("Rate_Code_Id",col("RatecodeID").cast(IntegerType())) \
        .withColumn("Store_And_Forward",col("store_and_fwd_flag").cast(StringType())) \
        .withColumn("PULocationID",col("PULocationID").cast(IntegerType())) \
        .withColumn("DOLocationID",col("DOLocationID").cast(IntegerType())) \
        .withColumn("Payment_Type_Id",col("payment_type").cast(IntegerType())) \
        .withColumn("Fare_Amount",col("fare_amount").cast(DoubleType())) \
        .withColumn("Surcharge",col("extra").cast(DoubleType())) \
        .withColumn("MTA_Tax",col("mta_tax").cast(DoubleType())) \
        .withColumn("Tip_Amount",col("tip_amount").cast(DoubleType())) \
        .withColumn("Tolls_Amount",col("tolls_amount").cast(DoubleType())) \
        .withColumn("Improvement_Surcharge",col("improvement_surcharge").cast(DoubleType())) \
        .withColumn("Total_Amount",col("total_amount").cast(DoubleType())) \
        .withColumn("Congestion_Surcharge",col("congestion_surcharge").cast(DoubleType()))
        #.withColumn("Airport_Fee",col("airport_fee").cast(DoubleType()))

    df_result = df_TypeCast.select( \
        'Vendor_Id', \
        'Pickup_DateTime', \
        'Dropoff_DateTime', \
        'Passenger_Count', \
        'Trip_Distance', \
        'Rate_Code_Id', \
        'Store_And_Forward', \
        'PULocationID', \
        'DOLocationID', \
        'Payment_Type_Id', \
        'Fare_Amount', \
        'Surcharge', \
        'MTA_Tax', \
        'Tip_Amount', \
        'Tolls_Amount', \
        'Improvement_Surcharge', \
        'Total_Amount', \
        'Congestion_Surcharge' \
        )
    
    return df_result