public static IgniteRel buildAggregates()

in modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/rel/agg/MapReduceAggregates.java [113:346]


    public static IgniteRel buildAggregates(LogicalAggregate agg, AggregateRelBuilder builder, Mapping fieldMappingOnReduce) {

        //
        // To implement MAP/REDUCE aggregate LogicalAggregate is transformed into
        // a map aggregate node, a reduce aggregate node, and an optional project node
        // (since some aggregate can be split into multiple ones, or require some additional work after REDUCE phase,
        // to combine the results).
        //
        // SELECT c1, MIN(c2), COUNT(c3) FROM test GROUP BY c1, c2
        //
        // MAP      [c1, c2, map_agg1, map_agg2]
        // REDUCE   [c1, c2, reduce_agg1, reduce_agg2]
        // PROJECT: [c1, c2, expr_agg1, expr_agg2]
        //
        // =>
        //
        // {map: map_agg1, reduce: reduce_agg1, expr: expr_agg1, ..}
        // {map: map_agg2, reduce: reduce_agg2, expr: expr_agg2, ..}
        //

        // Create a list of descriptors for map/reduce version of the given arguments.
        // This list is later used to create MAP/REDUCE version of each aggregate.

        List<MapReduceAgg> mapReduceAggs = new ArrayList<>(agg.getAggCallList().size());
        // groupSet includes all columns from GROUP BY/GROUPING SETS clauses.
        int argumentOffset = agg.getGroupSet().cardinality();

        // MAP PHASE AGGREGATE

        List<AggregateCall> mapAggCalls = new ArrayList<>(agg.getAggCallList().size());

        for (AggregateCall call : agg.getAggCallList()) {
            // See ReturnTypes::AVG_AGG_FUNCTION, Result type of a aggregate with no grouping or with filtering can be nullable.
            boolean canBeNull = agg.getGroupCount() == 0 || call.hasFilter();

            MapReduceAgg mapReduceAgg = createMapReduceAggCall(
                    Commons.cluster(),
                    call,
                    argumentOffset,
                    agg.getInput().getRowType(),
                    canBeNull
            );
            argumentOffset += mapReduceAgg.reduceCalls.size();
            mapReduceAggs.add(mapReduceAgg);

            mapAggCalls.addAll(mapReduceAgg.mapCalls);
        }

        // MAP phase should have no less than the number of arguments as original aggregate.
        // Otherwise there is a bug, because some aggregates were ignored.
        assert mapAggCalls.size() >= agg.getAggCallList().size() :
                format("The number of MAP aggregates is not correct. Original: {}\nMAP: {}", agg.getAggCallList(), mapAggCalls);

        RelNode map = builder.makeMapAgg(
                agg.getCluster(),
                agg.getInput(),
                agg.getGroupSet(),
                agg.getGroupSets(),
                mapAggCalls
        );

        //
        // REDUCE INPUT PROJECTION
        //

        RelDataTypeFactory.Builder reduceType = new Builder(Commons.typeFactory());

        int groupByColumns = agg.getGroupSet().cardinality();
        boolean sameAggsForBothPhases = true;

        // Build row type for input of REDUCE phase.
        // It consists of columns from agg.groupSet and aggregate expressions.

        for (int i = 0; i < groupByColumns; i++) {
            List<RelDataTypeField> outputRowFields = agg.getRowType().getFieldList();
            RelDataType type = outputRowFields.get(i).getType();
            reduceType.add("f" + reduceType.getFieldCount(), type);
        }

        RexBuilder rexBuilder = agg.getCluster().getRexBuilder();
        IgniteTypeFactory typeFactory = (IgniteTypeFactory) agg.getCluster().getTypeFactory();

        List<RexNode> reduceInputExprs = new ArrayList<>();

        for (int i = 0; i < map.getRowType().getFieldList().size(); i++) {
            RelDataType type = map.getRowType().getFieldList().get(i).getType();
            RexInputRef ref = new RexInputRef(i, type);
            reduceInputExprs.add(ref);
        }

        // Build a list of projections for reduce operator,
        // if all projections are identity, it is not necessary
        // to create a projection between MAP and REDUCE operators.

        boolean additionalProjectionsForReduce = false;

        for (int i = 0, argOffset = 0; i < mapReduceAggs.size(); i++) {
            MapReduceAgg mapReduceAgg = mapReduceAggs.get(i);
            int argIdx = groupByColumns + argOffset;

            for (int j = 0; j < mapReduceAgg.reduceCalls.size(); j++) {
                RexNode projExpr = mapReduceAgg.makeReduceInputExpr.makeExpr(rexBuilder, map, IntList.of(argIdx), typeFactory);
                reduceInputExprs.set(argIdx, projExpr);

                if (mapReduceAgg.makeReduceInputExpr != USE_INPUT_FIELD) {
                    additionalProjectionsForReduce = true;
                }

                argIdx += 1;
            }

            argOffset += mapReduceAgg.reduceCalls.size();
        }

        RelNode reduceInputNode;
        if (additionalProjectionsForReduce) {
            RelDataTypeFactory.Builder projectRow = new Builder(agg.getCluster().getTypeFactory());

            for (int i = 0; i < reduceInputExprs.size(); i++) {
                RexNode rexNode = reduceInputExprs.get(i);
                projectRow.add(String.valueOf(i), rexNode.getType());
            }

            RelDataType projectRowType = projectRow.build();

            reduceInputNode = builder.makeProject(agg.getCluster(), map, reduceInputExprs, projectRowType);
        } else {
            reduceInputNode = map;
        }

        //
        // REDUCE PHASE AGGREGATE
        //
        // Build a list of aggregate calls for REDUCE phase.
        // Build a list of projections (arg-list, expr) that accept reduce phase and combine/collect/cast results.

        List<AggregateCall> reduceAggCalls = new ArrayList<>();
        List<Map.Entry<IntList, MakeReduceExpr>> projection = new ArrayList<>(mapReduceAggs.size());

        for (MapReduceAgg mapReduceAgg : mapReduceAggs) {
            // Update row type returned by REDUCE node.
            int i = 0;
            for (AggregateCall reduceCall : mapReduceAgg.reduceCalls) {
                reduceType.add("f" + i + "_" + reduceType.getFieldCount(), reduceCall.getType());
                reduceAggCalls.add(reduceCall);
                i += 1;
            }

            // Update projection list
            IntList reduceArgList = mapReduceAgg.argList;
            MakeReduceExpr projectionExpr = mapReduceAgg.makeReduceOutputExpr;
            projection.add(new SimpleEntry<>(reduceArgList, projectionExpr));

            if (projectionExpr != USE_INPUT_FIELD) {
                sameAggsForBothPhases = false;
            }
        }

        RelDataType reduceTypeToUse;
        if (sameAggsForBothPhases) {
            reduceTypeToUse = agg.getRowType();
        } else {
            reduceTypeToUse = reduceType.build();
        }

        // if the number of aggregates on MAP phase is larger then the number of aggregates on REDUCE phase,
        // assume that some of MAP aggregates are not used by REDUCE phase and this is a bug.
        //
        // NOTE: In general case REDUCE phase can use more aggregates than MAP phase,
        // but at the moment there is no support for such aggregates.
        assert mapAggCalls.size() <= reduceAggCalls.size() :
                format("The number of MAP/REDUCE aggregates is not correct. MAP: {}\nREDUCE: {}", mapAggCalls, reduceAggCalls);

        // Apply mapping to groupSet/groupSets on REDUCE phase.
        ImmutableBitSet groupSetOnReduce = Mappings.apply(fieldMappingOnReduce, agg.getGroupSet());
        List<ImmutableBitSet> groupSetsOnReduce = agg.getGroupSets().stream()
                .map(g -> Mappings.apply(fieldMappingOnReduce, g))
                .collect(Collectors.toList());

        IgniteRel reduce = builder.makeReduceAgg(
                agg.getCluster(),
                reduceInputNode,
                groupSetOnReduce,
                groupSetsOnReduce,
                reduceAggCalls,
                reduceTypeToUse
        );

        //
        // FINAL PROJECTION
        //
        // if aggregate MAP phase uses the same aggregates as REDUCE phase,
        // there is no need to add a projection because no additional actions are required to compute final results.
        if (sameAggsForBothPhases) {
            return reduce;
        }

        List<RexNode> projectionList = new ArrayList<>(projection.size() + groupByColumns);

        // Projection list returned by AggregateNode consists of columns from GROUP BY clause
        // and expressions that represent aggregate calls.
        // In case of MAP/REDUCE those expressions should compute final results for each MAP/REDUCE aggregate.

        int i = 0;
        for (; i < groupByColumns; i++) {
            List<RelDataTypeField> outputRowFields = agg.getRowType().getFieldList();
            RelDataType type = outputRowFields.get(i).getType();
            RexInputRef ref = new RexInputRef(i, type);
            projectionList.add(ref);
        }

        for (Map.Entry<IntList, MakeReduceExpr> expr : projection) {
            RexNode resultExpr = expr.getValue().makeExpr(rexBuilder, reduce, expr.getKey(), typeFactory);
            projectionList.add(resultExpr);
        }

        assert projectionList.size() == agg.getRowType().getFieldList().size() :
                format("Projection size does not match. Expected: {} but got {}",
                        agg.getRowType().getFieldList().size(), projectionList.size());

        for (i = 0;  i < projectionList.size(); i++) {
            RexNode resultExpr = projectionList.get(i);
            List<RelDataTypeField> outputRowFields = agg.getRowType().getFieldList();

            // Put assertion here so we can see an expression that caused a type mismatch,
            // since Project::isValid only shows types.
            assert resultExpr.getType().equals(outputRowFields.get(i).getType()) :
                    format("Type at position#{} does not match. Expected: {} but got {}.\nREDUCE aggregates: {}\nRow: {}.\nExpr: {}",
                            i, resultExpr.getType(), outputRowFields.get(i).getType(), reduceAggCalls, outputRowFields, resultExpr);

        }

        return new IgniteProject(agg.getCluster(), reduce.getTraitSet(), reduce, projectionList, agg.getRowType());
    }