def apply()

in wayang-benchmark/code/main/scala/org/apache/wayang/apps/tpch/queries/Query1.scala [62:138]


  def apply(configuration: Configuration,
            jdbcPlatform: JdbcPlatformTemplate,
            createTableSource: (String, Seq[String]) => JdbcTableSource,
            delta: Int = 90)
           (implicit experiment: Experiment): Iterable[Query1.Result] = {

    val wayangCtx = new WayangContext(configuration)
    plugins.foreach(wayangCtx.register)
    val planBuilder = new PlanBuilder(wayangCtx)
      .withJobName(s"TPC-H (${this.getClass.getSimpleName})")
      .withUdfJarsOf(classOf[Query1])
      .withExperiment(experiment)

    experiment.getSubject.addConfiguration("jdbcUrl", configuration.getStringProperty(jdbcPlatform.jdbcUrlProperty))
    experiment.getSubject.addConfiguration("delta", delta)

    // Read, filter, and project the customer data.
    val _delta = delta
    val result = planBuilder
      .readTable(createTableSource("LINEITEM", LineItem.fields))
      .withName("Load LINEITEM table")

      .filter(t => CsvUtils.parseDate(t.getString(10)) <= CsvUtils.parseDate("1998-12-01") - _delta,
        sqlUdf = s"date(l_shipdate) <= date('1998-12-01', '- ${_delta} day')", selectivity = .25)
      .withName("Filter line items")

      .projectRecords(Seq("l_returnflag", "l_linestatus", "l_quantity", "l_extendedprice", "l_discount", "l_tax"))
      .withName("Project line items")

      .map(record => Query1.Result(
        record.getString(0),
        record.getString(1),
        record.getDouble(2),
        record.getDouble(3),
        record.getDouble(3) * (1 - record.getDouble(4)),
        record.getDouble(3) * (1 - record.getDouble(4)) * (1 + record.getDouble(5)),
        record.getDouble(2),
        record.getDouble(3),
        record.getDouble(4),
        1
      ))
      .withName("Calculate result fields")

      .reduceByKey(
        result => (result.l_returnflag, result.l_linestatus),
        (r1, r2) => Query1.Result(
          r1.l_returnflag,
          r1.l_linestatus,
          r1.sum_qty + r2.sum_qty,
          r1.sum_base_price + r2.sum_base_price,
          r1.sum_disc_price + r2.sum_disc_price,
          r1.sum_charge + r2.sum_charge,
          r1.avg_qty + r2.avg_qty,
          r1.avg_price + r2.avg_price,
          r1.avg_disc + r2.avg_disc,
          r1.count_order + r2.count_order
        )
      )
      .withName("Aggregate line items")

      .map(result => Query1.Result(
        result.l_returnflag,
        result.l_linestatus,
        result.sum_qty,
        result.sum_base_price,
        result.sum_disc_price,
        result.sum_charge,
        result.avg_qty / result.count_order,
        result.avg_price / result.count_order,
        result.avg_disc / result.count_order,
        result.count_order
      ))
      .withName("Post-process line item aggregates")
      .collect()

    result
  }