def GetSchemaHomogenizedDataframe()

in 003-DrivingMissData/Coach/Solutions/Challenge02/06-LoadData-Green.scala [202:306]


def GetSchemaHomogenizedDataframe(sourceDF: org.apache.spark.sql.DataFrame,
                                  tripYear: Int, 
                                  tripMonth: Int): org.apache.spark.sql.DataFrame =
{
  var df : org.apache.spark.sql.DataFrame = null

  if (tripYear >= 2017 || (tripYear == 2016 && tripMonth > 6))
  {
    // println("tripYear >= 2017 || (tripYear == 2016 && tripMonth > 6)")
    
    df = sourceDF
      .withColumn("trip_year", substring(col("lpep_pickup_datetime"), 0, 4))
      .withColumn("trip_month", substring(col("lpep_pickup_datetime"), 6, 2))
      .withColumn("taxi_type", lit("green"))
      .withColumnRenamed("VendorID", "vendor_id")
      .withColumnRenamed("lpep_pickup_datetime", "pickup_datetime")
      .withColumnRenamed("lpep_dropoff_datetime", "dropoff_datetime")
      // passenger_count
      // trip_distance
      .withColumnRenamed("RatecodeID", "rate_code_id")
      // store_and_fwd_flag
      .withColumnRenamed("PULocationID", "pickup_location_id")
      .withColumnRenamed("DOLocationID", "dropoff_location_id")
      .withColumn("pickup_longitude", lit(""))
      .withColumn("pickup_latitude", lit(""))
      .withColumn("dropoff_longitude", lit(""))
      .withColumn("dropoff_latitude", lit(""))
      // payment_type
      // fare_amount
      // extra
      // mta_tax
      // tip_amount
      // tolls_amount
      // ehail_fee
      // improvement_surcharge
      // total_amount
      // trip_type
  }
  else if ((tripYear == 2016 && tripMonth <= 6) || tripYear == 2015)
  {
    // println("(tripYear == 2016 && tripMonth <= 6) || tripYear == 2015")
    
    df = sourceDF
      .withColumn("trip_year", substring(col("lpep_pickup_datetime"), 0, 4))
      .withColumn("trip_month", substring(col("lpep_pickup_datetime"), 6, 2))
      .withColumn("taxi_type", lit("green"))
      .withColumnRenamed("VendorID", "vendor_id")
      .withColumnRenamed("lpep_pickup_datetime", "pickup_datetime")
      .withColumnRenamed("Lpep_dropoff_datetime", "dropoff_datetime")
      .withColumnRenamed("Passenger_count", "passenger_count")
      .withColumnRenamed("Trip_distance", "trip_distance")
      .withColumnRenamed("RateCodeID", "rate_code_id")
      .withColumnRenamed("Store_and_fwd_flag", "store_and_fwd_flag")
      .withColumn("pickup_location_id", lit(0).cast(IntegerType))
      .withColumn("dropoff_location_id", lit(0).cast(IntegerType))
      .withColumn("temp_pickup_longitude", col("Pickup_longitude").cast(StringType)).drop("Pickup_longitude").withColumnRenamed("temp_pickup_longitude", "pickup_longitude")
      .withColumn("temp_pickup_latitude", col("Pickup_latitude").cast(StringType)).drop("Pickup_latitude").withColumnRenamed("temp_pickup_latitude", "pickup_latitude")
      .withColumn("temp_dropoff_longitude", col("Dropoff_longitude").cast(StringType)).drop("Dropoff_longitude").withColumnRenamed("temp_dropoff_longitude", "dropoff_longitude")
      .withColumn("temp_dropoff_latitude", col("Dropoff_latitude").cast(StringType)).drop("Dropoff_latitude").withColumnRenamed("temp_dropoff_latitude", "dropoff_latitude")
      .withColumnRenamed("Payment_type", "payment_type")
      .withColumnRenamed("Fare_amount", "fare_amount")
      .withColumnRenamed("Extra", "extra")
      .withColumnRenamed("MTA_tax", "mta_tax")
      .withColumnRenamed("Tip_amount", "tip_amount")
      .withColumnRenamed("Tolls_amount", "tolls_amount")
      .withColumnRenamed("Ehail_fee", "ehail_fee")
      .withColumnRenamed("improvement_surcharge", "improvement_surcharge")
      .withColumnRenamed("Total_amount", "total_amount")
      .withColumnRenamed("Trip_type", "trip_type")
  }
  else if (tripYear == 2013 || tripYear == 2014)
  {
    // println("tripYear == 2013 || tripYear == 2014")
    
    df = sourceDF
      .withColumn("trip_year", substring(col("lpep_pickup_datetime"), 0, 4))
      .withColumn("trip_month", substring(col("lpep_pickup_datetime"), 6, 2))    
      .withColumn("taxi_type", lit("green"))
      .withColumnRenamed("VendorID", "vendor_id")
      .withColumnRenamed("lpep_pickup_datetime", "pickup_datetime")
      .withColumnRenamed("Lpep_dropoff_datetime", "dropoff_datetime")
      .withColumnRenamed("Passenger_count", "passenger_count")
      .withColumnRenamed("Trip_distance", "trip_distance")
      .withColumnRenamed("RateCodeID", "rate_code_id")
      .withColumnRenamed("Store_and_fwd_flag", "store_and_fwd_flag")
      .withColumn("pickup_location_id", lit(0).cast(IntegerType))
      .withColumn("dropoff_location_id", lit(0).cast(IntegerType))
      .withColumn("temp_pickup_longitude", col("Pickup_longitude").cast(StringType)).drop("Pickup_longitude").withColumnRenamed("temp_pickup_longitude", "pickup_longitude")
      .withColumn("temp_pickup_latitude", col("Pickup_latitude").cast(StringType)).drop("Pickup_latitude").withColumnRenamed("temp_pickup_latitude", "pickup_latitude")
      .withColumn("temp_dropoff_longitude", col("Dropoff_longitude").cast(StringType)).drop("Dropoff_longitude").withColumnRenamed("temp_dropoff_longitude", "dropoff_longitude")
      .withColumn("temp_dropoff_latitude", col("Dropoff_latitude").cast(StringType)).drop("Dropoff_latitude").withColumnRenamed("temp_dropoff_latitude", "dropoff_latitude")
      .withColumnRenamed("Payment_type", "payment_type")
      .withColumnRenamed("Fare_amount", "fare_amount")
      .withColumnRenamed("Extra", "extra")
      .withColumnRenamed("MTA_tax", "mta_tax")
      .withColumnRenamed("Tip_amount", "tip_amount")
      .withColumnRenamed("Tolls_amount", "tolls_amount")
      .withColumnRenamed("Ehail_fee", "ehail_fee")
      .withColumn("improvement_surcharge",lit(0).cast(DoubleType))
      .withColumnRenamed("Total_amount", "total_amount")
      .withColumnRenamed("Trip_type", "trip_type")
  }
  
  df
}