def main()

in wayang-benchmark/src/main/scala/org/apache/wayang/multicontext/apps/tpch/Query1.scala [34:130]


  def main(args: Array[String]): Unit = {

    if (args.length != 2 && args.length != 4) {
      println("Usage: <lineitem-url> <delta> <optional-path-to-config-1> <optional-path-to-config-2>")
      System.exit(1)
    }

    println("TPC-H querying #1 in multi context wayang!")
    println("Scala version:")
    println(scala.util.Properties.versionString)

    val (configuration1, configuration2) = loadConfig(args.drop(2))

    val context1 = new MultiContext(configuration1)
      .withPlugin(Java.basicPlugin())
      .withPlugin(Spark.basicPlugin())
      .withTextFileSink("file:///tmp/out11")
    val context2 = new MultiContext(configuration2)
      .withPlugin(Java.basicPlugin())
      .withPlugin(Spark.basicPlugin())
      .withTextFileSink("file:///tmp/out12")

    val multiContextPlanBuilder = new MultiContextPlanBuilder(List(context1, context2)).withUdfJarsOf(classOf[Query1])

    // Example structure of lineitem file:
    // 1|155190|7706|1|17|21168.23|0.04|0.02|N|O|1996-03-13|1996-02-12|1996-03-22|DELIVER IN PERSON|TRUCK|egular courts above the
    // 1|67310|7311|2|36|45983.16|0.09|0.06|N|O|1996-04-12|1996-02-28|1996-04-20|TAKE BACK RETURN|MAIL|ly final dependencies: slyly bold
    // ...
    val lineItemFile = args(0)
    val delta = args(1).toInt

    multiContextPlanBuilder

      // Load lineitem file
      .forEach(_.readTextFile(lineItemFile))

      // Parse
      .forEach(_.map(s => LineItem.parseCsv(s)))

      // Filter line items
      .forEach(_.filter(t => t.shipDate <= CsvUtils.parseDate("1998-12-01") - delta))

      // Project line items
      .forEach(_.map(t => (t.returnFlag, t.lineStatus, t.quantity, t.extendedPrice, t.discount, t.tax)))

      // Calculate result fields
      .forEach(_.map { case (returnFlag, lineStatus, quantity, extendedPrice, discount, tax) =>
        Query1Utils.Result(
          returnFlag.toString,
          lineStatus.toString,
          quantity,
          extendedPrice,
          extendedPrice * (1 - discount),
          extendedPrice * (1 - discount) * (1 + tax),
          quantity,
          extendedPrice,
          discount,
          1
        )
      })

      // Aggregate line items
      .forEach(_.reduceByKey(
        result => (result.l_returnflag, result.l_linestatus),
        (r1, r2) => Query1Utils.Result(
          r1.l_returnflag,
          r1.l_linestatus,
          r1.sum_qty + r2.sum_qty,
          r1.sum_base_price + r2.sum_base_price,
          r1.sum_disc_price + r2.sum_disc_price,
          r1.sum_charge + r2.sum_charge,
          r1.avg_qty + r2.avg_qty,
          r1.avg_price + r2.avg_price,
          r1.avg_disc + r2.avg_disc,
          r1.count_order + r2.count_order
        )
      ))

      // Post-process line items aggregates
      .forEach(_.map(result => Query1Utils.Result(
        result.l_returnflag,
        result.l_linestatus,
        result.sum_qty,
        result.sum_base_price,
        result.sum_disc_price,
        result.sum_charge,
        result.avg_qty / result.count_order,
        result.avg_price / result.count_order,
        result.avg_disc / result.count_order,
        result.count_order
      )))

      // Execute
      .execute()


  }