in wayang-benchmark/code/main/scala/org/apache/wayang/apps/tpch/queries/Query1.scala [62:138]
def apply(configuration: Configuration,
jdbcPlatform: JdbcPlatformTemplate,
createTableSource: (String, Seq[String]) => JdbcTableSource,
delta: Int = 90)
(implicit experiment: Experiment): Iterable[Query1.Result] = {
val wayangCtx = new WayangContext(configuration)
plugins.foreach(wayangCtx.register)
val planBuilder = new PlanBuilder(wayangCtx)
.withJobName(s"TPC-H (${this.getClass.getSimpleName})")
.withUdfJarsOf(classOf[Query1])
.withExperiment(experiment)
experiment.getSubject.addConfiguration("jdbcUrl", configuration.getStringProperty(jdbcPlatform.jdbcUrlProperty))
experiment.getSubject.addConfiguration("delta", delta)
// Read, filter, and project the customer data.
val _delta = delta
val result = planBuilder
.readTable(createTableSource("LINEITEM", LineItem.fields))
.withName("Load LINEITEM table")
.filter(t => CsvUtils.parseDate(t.getString(10)) <= CsvUtils.parseDate("1998-12-01") - _delta,
sqlUdf = s"date(l_shipdate) <= date('1998-12-01', '- ${_delta} day')", selectivity = .25)
.withName("Filter line items")
.projectRecords(Seq("l_returnflag", "l_linestatus", "l_quantity", "l_extendedprice", "l_discount", "l_tax"))
.withName("Project line items")
.map(record => Query1.Result(
record.getString(0),
record.getString(1),
record.getDouble(2),
record.getDouble(3),
record.getDouble(3) * (1 - record.getDouble(4)),
record.getDouble(3) * (1 - record.getDouble(4)) * (1 + record.getDouble(5)),
record.getDouble(2),
record.getDouble(3),
record.getDouble(4),
1
))
.withName("Calculate result fields")
.reduceByKey(
result => (result.l_returnflag, result.l_linestatus),
(r1, r2) => Query1.Result(
r1.l_returnflag,
r1.l_linestatus,
r1.sum_qty + r2.sum_qty,
r1.sum_base_price + r2.sum_base_price,
r1.sum_disc_price + r2.sum_disc_price,
r1.sum_charge + r2.sum_charge,
r1.avg_qty + r2.avg_qty,
r1.avg_price + r2.avg_price,
r1.avg_disc + r2.avg_disc,
r1.count_order + r2.count_order
)
)
.withName("Aggregate line items")
.map(result => Query1.Result(
result.l_returnflag,
result.l_linestatus,
result.sum_qty,
result.sum_base_price,
result.sum_disc_price,
result.sum_charge,
result.avg_qty / result.count_order,
result.avg_price / result.count_order,
result.avg_disc / result.count_order,
result.count_order
))
.withName("Post-process line item aggregates")
.collect()
result
}