in wayang-benchmark/src/main/java/org/apache/wayang/apps/tpch/TPCHQ1WithJavaNative.java [67:141]
private static WayangPlan createQ1(String lineItemUrl, final int delta) {
// Read the lineitem table.
TextFileSource lineItemText = new TextFileSource(lineItemUrl, "UTF-8");
// Parse the rows.
MapOperator<String, LineItemTuple> parser = new MapOperator<>(
(line) -> new LineItemTuple.Parser().parse(line), String.class, LineItemTuple.class
);
lineItemText.connectTo(0, parser, 0);
// Filter by shipdate.
final int maxShipdate = LineItemTuple.Parser.parseDate("1998-12-01") - delta;
FilterOperator<LineItemTuple> filter = new FilterOperator<>(
(tuple) -> tuple.L_SHIPDATE <= maxShipdate, LineItemTuple.class
);
parser.connectTo(0, filter, 0);
// Project the queried attributes.
MapOperator<LineItemTuple, ReturnTuple> projection = new MapOperator<>(
(lineItemTuple) -> new ReturnTuple(
lineItemTuple.L_RETURNFLAG,
lineItemTuple.L_LINESTATUS,
lineItemTuple.L_QUANTITY,
lineItemTuple.L_EXTENDEDPRICE,
lineItemTuple.L_EXTENDEDPRICE * (1 - lineItemTuple.L_DISCOUNT),
lineItemTuple.L_EXTENDEDPRICE * (1 - lineItemTuple.L_DISCOUNT) * (1 + lineItemTuple.L_TAX),
lineItemTuple.L_QUANTITY,
lineItemTuple.L_EXTENDEDPRICE,
lineItemTuple.L_DISCOUNT,
1),
LineItemTuple.class,
ReturnTuple.class
);
filter.connectTo(0, projection, 0);
// Aggregation phase 1.
ReduceByOperator<ReturnTuple, GroupKey> aggregation = new ReduceByOperator<>(
(returnTuple) -> new GroupKey(returnTuple.L_RETURNFLAG, returnTuple.L_LINESTATUS),
((t1, t2) -> {
t1.SUM_QTY += t2.SUM_QTY;
t1.SUM_BASE_PRICE += t2.SUM_BASE_PRICE;
t1.SUM_DISC_PRICE += t2.SUM_DISC_PRICE;
t1.SUM_CHARGE += t2.SUM_CHARGE;
t1.AVG_QTY += t2.AVG_QTY;
t1.AVG_PRICE += t2.AVG_PRICE;
t1.AVG_DISC += t2.AVG_DISC;
t1.COUNT_ORDER += t2.COUNT_ORDER;
return t1;
}),
GroupKey.class,
ReturnTuple.class
);
projection.connectTo(0, aggregation, 0);
// Aggregation phase 2: complete AVG operations.
MapOperator<ReturnTuple, ReturnTuple> aggregationFinalization = new MapOperator<>(
(t -> {
t.AVG_QTY /= t.COUNT_ORDER;
t.AVG_PRICE /= t.COUNT_ORDER;
t.AVG_DISC /= t.COUNT_ORDER;
return t;
}),
ReturnTuple.class,
ReturnTuple.class
);
aggregation.connectTo(0, aggregationFinalization, 0);
// TODO: Implement sorting (as of now not possible with Wayang's basic operators).
// Print the results.
LocalCallbackSink<ReturnTuple> sink = LocalCallbackSink.createStdoutSink(ReturnTuple.class);
aggregationFinalization.connectTo(0, sink, 0);
return new WayangPlan(sink);
}