in spark-application/src/main/scala/ValueZones.scala [107:167]
def runJob( yellow :List[String], green :List[String], zones :String, target :String, dbName :String) = {
logger.info("Execution started")
import session.implicits._
session.conf.set("spark.sql.session.timeZone", "America/New_York")
val yellowRides = yellow.map(readYellow(_)).reduceLeft( (a,b) => a.union(b))
val greenRides = green.map(readGreen(_)).reduceLeft( (a,b) => a.union(b))
val zonesInfo = session.read
.option("header","true")
.option("inferSchema", "true")
.option("enforceSchema", "false")
.option("columnNameOfCorruptRecord", "error")
.csv(zones)
val allEventsWithZone = yellowRides
.union(greenRides)
.withColumn("taxiColor",lit("green"))
.withColumn("duration", unix_timestamp($"dropoff_datetime").minus(unix_timestamp($"pickup_datetime")))
.withColumn("minute_rate",$"total_amount".divide($"duration") * 60)
.join(zonesInfo,$"PULocationID" === $"LocationID")
allEventsWithZone.cache
val zoneAttractiveness = allEventsWithZone
.select("pickup_datetime","minute_rate","taxiColor","LocationID","Borough", "Zone")
.groupBy($"LocationID", date_trunc("hour",$"pickup_datetime") as "pickup_hour")
.pivot("taxiColor",Seq("yellow", "green"))
.agg("minute_rate" -> "avg", "minute_rate" -> "count")
.withColumnRenamed("yellow_avg(minute_rate)","yellow_avg_minute_rate")
.withColumnRenamed("yellow_count(minute_rate)","yellow_count")
.withColumnRenamed("green_avg(minute_rate)","green_avg_minute_rate")
.withColumnRenamed("green_count(minute_rate)","green_count")
session.sql("CREATE DATABASE IF NOT EXISTS `"+dbName+"`")
session.sql("use `"+dbName+"`")
val rawQuery = allEventsWithZone
.withColumn("year", year($"pickup_datetime"))
.withColumn("month", month($"pickup_datetime"))
.withColumn("day", dayofmonth($"pickup_datetime"))
.repartition($"year",$"month")
.sortWithinPartitions("day")
.write
.format("parquet")
.mode("OVERWRITE")
.partitionBy("year","month")
.option("path", target+ "/raw-rides")
.saveAsTable("raw_rides")
val aggregateQuery = zoneAttractiveness
.repartition(1)
.sortWithinPartitions($"pickup_hour")
.write
.format("parquet")
.mode("OVERWRITE")
.option("path", target+ "/value-rides")
.saveAsTable("value_rides")
}