def runJob()

in spark-application/src/main/scala/ValueZones.scala [107:167]


  def runJob( yellow :List[String], green :List[String], zones :String, target :String, dbName :String) = {

    logger.info("Execution started")
    import session.implicits._
    session.conf.set("spark.sql.session.timeZone", "America/New_York")

    val yellowRides = yellow.map(readYellow(_)).reduceLeft( (a,b) => a.union(b))

    val greenRides = green.map(readGreen(_)).reduceLeft( (a,b) => a.union(b))

    val zonesInfo = session.read
      .option("header","true")
      .option("inferSchema", "true")
      .option("enforceSchema", "false")
      .option("columnNameOfCorruptRecord", "error")
      .csv(zones)

    val allEventsWithZone = yellowRides
      .union(greenRides)
      .withColumn("taxiColor",lit("green"))
      .withColumn("duration", unix_timestamp($"dropoff_datetime").minus(unix_timestamp($"pickup_datetime")))
      .withColumn("minute_rate",$"total_amount".divide($"duration") * 60)
      .join(zonesInfo,$"PULocationID" === $"LocationID")

    allEventsWithZone.cache

    val zoneAttractiveness = allEventsWithZone
      .select("pickup_datetime","minute_rate","taxiColor","LocationID","Borough", "Zone")
      .groupBy($"LocationID", date_trunc("hour",$"pickup_datetime") as "pickup_hour")
      .pivot("taxiColor",Seq("yellow", "green"))
      .agg("minute_rate" -> "avg", "minute_rate" -> "count")
      .withColumnRenamed("yellow_avg(minute_rate)","yellow_avg_minute_rate")
      .withColumnRenamed("yellow_count(minute_rate)","yellow_count")
      .withColumnRenamed("green_avg(minute_rate)","green_avg_minute_rate")
      .withColumnRenamed("green_count(minute_rate)","green_count")

    session.sql("CREATE DATABASE IF NOT EXISTS `"+dbName+"`")
    session.sql("use `"+dbName+"`")

    val rawQuery = allEventsWithZone
      .withColumn("year", year($"pickup_datetime"))
      .withColumn("month", month($"pickup_datetime"))
      .withColumn("day", dayofmonth($"pickup_datetime"))
      .repartition($"year",$"month")
      .sortWithinPartitions("day")
      .write
      .format("parquet")
      .mode("OVERWRITE")
      .partitionBy("year","month")
      .option("path", target+ "/raw-rides")
      .saveAsTable("raw_rides")

    val aggregateQuery = zoneAttractiveness
      .repartition(1)
      .sortWithinPartitions($"pickup_hour")
      .write
      .format("parquet")
      .mode("OVERWRITE")
      .option("path", target+ "/value-rides")
      .saveAsTable("value_rides")
  }