in 003-DrivingMissData/Coach/Solutions/Challenge02/06-LoadData-Green.scala [202:306]
def GetSchemaHomogenizedDataframe(sourceDF: org.apache.spark.sql.DataFrame,
tripYear: Int,
tripMonth: Int): org.apache.spark.sql.DataFrame =
{
var df : org.apache.spark.sql.DataFrame = null
if (tripYear >= 2017 || (tripYear == 2016 && tripMonth > 6))
{
// println("tripYear >= 2017 || (tripYear == 2016 && tripMonth > 6)")
df = sourceDF
.withColumn("trip_year", substring(col("lpep_pickup_datetime"), 0, 4))
.withColumn("trip_month", substring(col("lpep_pickup_datetime"), 6, 2))
.withColumn("taxi_type", lit("green"))
.withColumnRenamed("VendorID", "vendor_id")
.withColumnRenamed("lpep_pickup_datetime", "pickup_datetime")
.withColumnRenamed("lpep_dropoff_datetime", "dropoff_datetime")
// passenger_count
// trip_distance
.withColumnRenamed("RatecodeID", "rate_code_id")
// store_and_fwd_flag
.withColumnRenamed("PULocationID", "pickup_location_id")
.withColumnRenamed("DOLocationID", "dropoff_location_id")
.withColumn("pickup_longitude", lit(""))
.withColumn("pickup_latitude", lit(""))
.withColumn("dropoff_longitude", lit(""))
.withColumn("dropoff_latitude", lit(""))
// payment_type
// fare_amount
// extra
// mta_tax
// tip_amount
// tolls_amount
// ehail_fee
// improvement_surcharge
// total_amount
// trip_type
}
else if ((tripYear == 2016 && tripMonth <= 6) || tripYear == 2015)
{
// println("(tripYear == 2016 && tripMonth <= 6) || tripYear == 2015")
df = sourceDF
.withColumn("trip_year", substring(col("lpep_pickup_datetime"), 0, 4))
.withColumn("trip_month", substring(col("lpep_pickup_datetime"), 6, 2))
.withColumn("taxi_type", lit("green"))
.withColumnRenamed("VendorID", "vendor_id")
.withColumnRenamed("lpep_pickup_datetime", "pickup_datetime")
.withColumnRenamed("Lpep_dropoff_datetime", "dropoff_datetime")
.withColumnRenamed("Passenger_count", "passenger_count")
.withColumnRenamed("Trip_distance", "trip_distance")
.withColumnRenamed("RateCodeID", "rate_code_id")
.withColumnRenamed("Store_and_fwd_flag", "store_and_fwd_flag")
.withColumn("pickup_location_id", lit(0).cast(IntegerType))
.withColumn("dropoff_location_id", lit(0).cast(IntegerType))
.withColumn("temp_pickup_longitude", col("Pickup_longitude").cast(StringType)).drop("Pickup_longitude").withColumnRenamed("temp_pickup_longitude", "pickup_longitude")
.withColumn("temp_pickup_latitude", col("Pickup_latitude").cast(StringType)).drop("Pickup_latitude").withColumnRenamed("temp_pickup_latitude", "pickup_latitude")
.withColumn("temp_dropoff_longitude", col("Dropoff_longitude").cast(StringType)).drop("Dropoff_longitude").withColumnRenamed("temp_dropoff_longitude", "dropoff_longitude")
.withColumn("temp_dropoff_latitude", col("Dropoff_latitude").cast(StringType)).drop("Dropoff_latitude").withColumnRenamed("temp_dropoff_latitude", "dropoff_latitude")
.withColumnRenamed("Payment_type", "payment_type")
.withColumnRenamed("Fare_amount", "fare_amount")
.withColumnRenamed("Extra", "extra")
.withColumnRenamed("MTA_tax", "mta_tax")
.withColumnRenamed("Tip_amount", "tip_amount")
.withColumnRenamed("Tolls_amount", "tolls_amount")
.withColumnRenamed("Ehail_fee", "ehail_fee")
.withColumnRenamed("improvement_surcharge", "improvement_surcharge")
.withColumnRenamed("Total_amount", "total_amount")
.withColumnRenamed("Trip_type", "trip_type")
}
else if (tripYear == 2013 || tripYear == 2014)
{
// println("tripYear == 2013 || tripYear == 2014")
df = sourceDF
.withColumn("trip_year", substring(col("lpep_pickup_datetime"), 0, 4))
.withColumn("trip_month", substring(col("lpep_pickup_datetime"), 6, 2))
.withColumn("taxi_type", lit("green"))
.withColumnRenamed("VendorID", "vendor_id")
.withColumnRenamed("lpep_pickup_datetime", "pickup_datetime")
.withColumnRenamed("Lpep_dropoff_datetime", "dropoff_datetime")
.withColumnRenamed("Passenger_count", "passenger_count")
.withColumnRenamed("Trip_distance", "trip_distance")
.withColumnRenamed("RateCodeID", "rate_code_id")
.withColumnRenamed("Store_and_fwd_flag", "store_and_fwd_flag")
.withColumn("pickup_location_id", lit(0).cast(IntegerType))
.withColumn("dropoff_location_id", lit(0).cast(IntegerType))
.withColumn("temp_pickup_longitude", col("Pickup_longitude").cast(StringType)).drop("Pickup_longitude").withColumnRenamed("temp_pickup_longitude", "pickup_longitude")
.withColumn("temp_pickup_latitude", col("Pickup_latitude").cast(StringType)).drop("Pickup_latitude").withColumnRenamed("temp_pickup_latitude", "pickup_latitude")
.withColumn("temp_dropoff_longitude", col("Dropoff_longitude").cast(StringType)).drop("Dropoff_longitude").withColumnRenamed("temp_dropoff_longitude", "dropoff_longitude")
.withColumn("temp_dropoff_latitude", col("Dropoff_latitude").cast(StringType)).drop("Dropoff_latitude").withColumnRenamed("temp_dropoff_latitude", "dropoff_latitude")
.withColumnRenamed("Payment_type", "payment_type")
.withColumnRenamed("Fare_amount", "fare_amount")
.withColumnRenamed("Extra", "extra")
.withColumnRenamed("MTA_tax", "mta_tax")
.withColumnRenamed("Tip_amount", "tip_amount")
.withColumnRenamed("Tolls_amount", "tolls_amount")
.withColumnRenamed("Ehail_fee", "ehail_fee")
.withColumn("improvement_surcharge",lit(0).cast(DoubleType))
.withColumnRenamed("Total_amount", "total_amount")
.withColumnRenamed("Trip_type", "trip_type")
}
df
}