private static WayangPlan createQ1()

in wayang-benchmark/src/main/java/org/apache/wayang/apps/tpch/TPCHQ1WithJavaNative.java [67:141]


    private static WayangPlan createQ1(String lineItemUrl, final int delta) {
        // Read the lineitem table.
        TextFileSource lineItemText = new TextFileSource(lineItemUrl, "UTF-8");

        // Parse the rows.
        MapOperator<String, LineItemTuple> parser = new MapOperator<>(
                (line) -> new LineItemTuple.Parser().parse(line), String.class, LineItemTuple.class
        );
        lineItemText.connectTo(0, parser, 0);

        // Filter by shipdate.
        final int maxShipdate = LineItemTuple.Parser.parseDate("1998-12-01") - delta;
        FilterOperator<LineItemTuple> filter = new FilterOperator<>(
                (tuple) -> tuple.L_SHIPDATE <= maxShipdate, LineItemTuple.class
        );
        parser.connectTo(0, filter, 0);

        // Project the queried attributes.
        MapOperator<LineItemTuple, ReturnTuple> projection = new MapOperator<>(
                (lineItemTuple) -> new ReturnTuple(
                        lineItemTuple.L_RETURNFLAG,
                        lineItemTuple.L_LINESTATUS,
                        lineItemTuple.L_QUANTITY,
                        lineItemTuple.L_EXTENDEDPRICE,
                        lineItemTuple.L_EXTENDEDPRICE * (1 - lineItemTuple.L_DISCOUNT),
                        lineItemTuple.L_EXTENDEDPRICE * (1 - lineItemTuple.L_DISCOUNT) * (1 + lineItemTuple.L_TAX),
                        lineItemTuple.L_QUANTITY,
                        lineItemTuple.L_EXTENDEDPRICE,
                        lineItemTuple.L_DISCOUNT,
                        1),
                LineItemTuple.class,
                ReturnTuple.class
        );
        filter.connectTo(0, projection, 0);

        // Aggregation phase 1.
        ReduceByOperator<ReturnTuple, GroupKey> aggregation = new ReduceByOperator<>(
                (returnTuple) -> new GroupKey(returnTuple.L_RETURNFLAG, returnTuple.L_LINESTATUS),
                ((t1, t2) -> {
                    t1.SUM_QTY += t2.SUM_QTY;
                    t1.SUM_BASE_PRICE += t2.SUM_BASE_PRICE;
                    t1.SUM_DISC_PRICE += t2.SUM_DISC_PRICE;
                    t1.SUM_CHARGE += t2.SUM_CHARGE;
                    t1.AVG_QTY += t2.AVG_QTY;
                    t1.AVG_PRICE += t2.AVG_PRICE;
                    t1.AVG_DISC += t2.AVG_DISC;
                    t1.COUNT_ORDER += t2.COUNT_ORDER;
                    return t1;
                }),
                GroupKey.class,
                ReturnTuple.class
        );
        projection.connectTo(0, aggregation, 0);

        // Aggregation phase 2: complete AVG operations.
        MapOperator<ReturnTuple, ReturnTuple> aggregationFinalization = new MapOperator<>(
                (t -> {
                    t.AVG_QTY /= t.COUNT_ORDER;
                    t.AVG_PRICE /= t.COUNT_ORDER;
                    t.AVG_DISC /= t.COUNT_ORDER;
                    return t;
                }),
                ReturnTuple.class,
                ReturnTuple.class
        );
        aggregation.connectTo(0, aggregationFinalization, 0);

        // TODO: Implement sorting (as of now not possible with Wayang's basic operators).

        // Print the results.
        LocalCallbackSink<ReturnTuple> sink = LocalCallbackSink.createStdoutSink(ReturnTuple.class);
        aggregationFinalization.connectTo(0, sink, 0);

        return new WayangPlan(sink);
    }