in modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/rel/agg/MapReduceAggregates.java [113:346]
public static IgniteRel buildAggregates(LogicalAggregate agg, AggregateRelBuilder builder, Mapping fieldMappingOnReduce) {
//
// To implement MAP/REDUCE aggregate LogicalAggregate is transformed into
// a map aggregate node, a reduce aggregate node, and an optional project node
// (since some aggregate can be split into multiple ones, or require some additional work after REDUCE phase,
// to combine the results).
//
// SELECT c1, MIN(c2), COUNT(c3) FROM test GROUP BY c1, c2
//
// MAP [c1, c2, map_agg1, map_agg2]
// REDUCE [c1, c2, reduce_agg1, reduce_agg2]
// PROJECT: [c1, c2, expr_agg1, expr_agg2]
//
// =>
//
// {map: map_agg1, reduce: reduce_agg1, expr: expr_agg1, ..}
// {map: map_agg2, reduce: reduce_agg2, expr: expr_agg2, ..}
//
// Create a list of descriptors for map/reduce version of the given arguments.
// This list is later used to create MAP/REDUCE version of each aggregate.
List<MapReduceAgg> mapReduceAggs = new ArrayList<>(agg.getAggCallList().size());
// groupSet includes all columns from GROUP BY/GROUPING SETS clauses.
int argumentOffset = agg.getGroupSet().cardinality();
// MAP PHASE AGGREGATE
List<AggregateCall> mapAggCalls = new ArrayList<>(agg.getAggCallList().size());
for (AggregateCall call : agg.getAggCallList()) {
// See ReturnTypes::AVG_AGG_FUNCTION, Result type of a aggregate with no grouping or with filtering can be nullable.
boolean canBeNull = agg.getGroupCount() == 0 || call.hasFilter();
MapReduceAgg mapReduceAgg = createMapReduceAggCall(
Commons.cluster(),
call,
argumentOffset,
agg.getInput().getRowType(),
canBeNull
);
argumentOffset += mapReduceAgg.reduceCalls.size();
mapReduceAggs.add(mapReduceAgg);
mapAggCalls.addAll(mapReduceAgg.mapCalls);
}
// MAP phase should have no less than the number of arguments as original aggregate.
// Otherwise there is a bug, because some aggregates were ignored.
assert mapAggCalls.size() >= agg.getAggCallList().size() :
format("The number of MAP aggregates is not correct. Original: {}\nMAP: {}", agg.getAggCallList(), mapAggCalls);
RelNode map = builder.makeMapAgg(
agg.getCluster(),
agg.getInput(),
agg.getGroupSet(),
agg.getGroupSets(),
mapAggCalls
);
//
// REDUCE INPUT PROJECTION
//
RelDataTypeFactory.Builder reduceType = new Builder(Commons.typeFactory());
int groupByColumns = agg.getGroupSet().cardinality();
boolean sameAggsForBothPhases = true;
// Build row type for input of REDUCE phase.
// It consists of columns from agg.groupSet and aggregate expressions.
for (int i = 0; i < groupByColumns; i++) {
List<RelDataTypeField> outputRowFields = agg.getRowType().getFieldList();
RelDataType type = outputRowFields.get(i).getType();
reduceType.add("f" + reduceType.getFieldCount(), type);
}
RexBuilder rexBuilder = agg.getCluster().getRexBuilder();
IgniteTypeFactory typeFactory = (IgniteTypeFactory) agg.getCluster().getTypeFactory();
List<RexNode> reduceInputExprs = new ArrayList<>();
for (int i = 0; i < map.getRowType().getFieldList().size(); i++) {
RelDataType type = map.getRowType().getFieldList().get(i).getType();
RexInputRef ref = new RexInputRef(i, type);
reduceInputExprs.add(ref);
}
// Build a list of projections for reduce operator,
// if all projections are identity, it is not necessary
// to create a projection between MAP and REDUCE operators.
boolean additionalProjectionsForReduce = false;
for (int i = 0, argOffset = 0; i < mapReduceAggs.size(); i++) {
MapReduceAgg mapReduceAgg = mapReduceAggs.get(i);
int argIdx = groupByColumns + argOffset;
for (int j = 0; j < mapReduceAgg.reduceCalls.size(); j++) {
RexNode projExpr = mapReduceAgg.makeReduceInputExpr.makeExpr(rexBuilder, map, IntList.of(argIdx), typeFactory);
reduceInputExprs.set(argIdx, projExpr);
if (mapReduceAgg.makeReduceInputExpr != USE_INPUT_FIELD) {
additionalProjectionsForReduce = true;
}
argIdx += 1;
}
argOffset += mapReduceAgg.reduceCalls.size();
}
RelNode reduceInputNode;
if (additionalProjectionsForReduce) {
RelDataTypeFactory.Builder projectRow = new Builder(agg.getCluster().getTypeFactory());
for (int i = 0; i < reduceInputExprs.size(); i++) {
RexNode rexNode = reduceInputExprs.get(i);
projectRow.add(String.valueOf(i), rexNode.getType());
}
RelDataType projectRowType = projectRow.build();
reduceInputNode = builder.makeProject(agg.getCluster(), map, reduceInputExprs, projectRowType);
} else {
reduceInputNode = map;
}
//
// REDUCE PHASE AGGREGATE
//
// Build a list of aggregate calls for REDUCE phase.
// Build a list of projections (arg-list, expr) that accept reduce phase and combine/collect/cast results.
List<AggregateCall> reduceAggCalls = new ArrayList<>();
List<Map.Entry<IntList, MakeReduceExpr>> projection = new ArrayList<>(mapReduceAggs.size());
for (MapReduceAgg mapReduceAgg : mapReduceAggs) {
// Update row type returned by REDUCE node.
int i = 0;
for (AggregateCall reduceCall : mapReduceAgg.reduceCalls) {
reduceType.add("f" + i + "_" + reduceType.getFieldCount(), reduceCall.getType());
reduceAggCalls.add(reduceCall);
i += 1;
}
// Update projection list
IntList reduceArgList = mapReduceAgg.argList;
MakeReduceExpr projectionExpr = mapReduceAgg.makeReduceOutputExpr;
projection.add(new SimpleEntry<>(reduceArgList, projectionExpr));
if (projectionExpr != USE_INPUT_FIELD) {
sameAggsForBothPhases = false;
}
}
RelDataType reduceTypeToUse;
if (sameAggsForBothPhases) {
reduceTypeToUse = agg.getRowType();
} else {
reduceTypeToUse = reduceType.build();
}
// if the number of aggregates on MAP phase is larger then the number of aggregates on REDUCE phase,
// assume that some of MAP aggregates are not used by REDUCE phase and this is a bug.
//
// NOTE: In general case REDUCE phase can use more aggregates than MAP phase,
// but at the moment there is no support for such aggregates.
assert mapAggCalls.size() <= reduceAggCalls.size() :
format("The number of MAP/REDUCE aggregates is not correct. MAP: {}\nREDUCE: {}", mapAggCalls, reduceAggCalls);
// Apply mapping to groupSet/groupSets on REDUCE phase.
ImmutableBitSet groupSetOnReduce = Mappings.apply(fieldMappingOnReduce, agg.getGroupSet());
List<ImmutableBitSet> groupSetsOnReduce = agg.getGroupSets().stream()
.map(g -> Mappings.apply(fieldMappingOnReduce, g))
.collect(Collectors.toList());
IgniteRel reduce = builder.makeReduceAgg(
agg.getCluster(),
reduceInputNode,
groupSetOnReduce,
groupSetsOnReduce,
reduceAggCalls,
reduceTypeToUse
);
//
// FINAL PROJECTION
//
// if aggregate MAP phase uses the same aggregates as REDUCE phase,
// there is no need to add a projection because no additional actions are required to compute final results.
if (sameAggsForBothPhases) {
return reduce;
}
List<RexNode> projectionList = new ArrayList<>(projection.size() + groupByColumns);
// Projection list returned by AggregateNode consists of columns from GROUP BY clause
// and expressions that represent aggregate calls.
// In case of MAP/REDUCE those expressions should compute final results for each MAP/REDUCE aggregate.
int i = 0;
for (; i < groupByColumns; i++) {
List<RelDataTypeField> outputRowFields = agg.getRowType().getFieldList();
RelDataType type = outputRowFields.get(i).getType();
RexInputRef ref = new RexInputRef(i, type);
projectionList.add(ref);
}
for (Map.Entry<IntList, MakeReduceExpr> expr : projection) {
RexNode resultExpr = expr.getValue().makeExpr(rexBuilder, reduce, expr.getKey(), typeFactory);
projectionList.add(resultExpr);
}
assert projectionList.size() == agg.getRowType().getFieldList().size() :
format("Projection size does not match. Expected: {} but got {}",
agg.getRowType().getFieldList().size(), projectionList.size());
for (i = 0; i < projectionList.size(); i++) {
RexNode resultExpr = projectionList.get(i);
List<RelDataTypeField> outputRowFields = agg.getRowType().getFieldList();
// Put assertion here so we can see an expression that caused a type mismatch,
// since Project::isValid only shows types.
assert resultExpr.getType().equals(outputRowFields.get(i).getType()) :
format("Type at position#{} does not match. Expected: {} but got {}.\nREDUCE aggregates: {}\nRow: {}.\nExpr: {}",
i, resultExpr.getType(), outputRowFields.get(i).getType(), reduceAggCalls, outputRowFields, resultExpr);
}
return new IgniteProject(agg.getCluster(), reduce.getTraitSet(), reduce, projectionList, agg.getRowType());
}