in core/src/main/java/org/apache/calcite/rel/logical/LogicalWindow.java [132:310]
public static RelNode create(RelOptCluster cluster,
RelTraitSet traitSet, RelBuilder relBuilder, RelNode child,
final RexProgram program) {
final RelDataType outRowType = program.getOutputRowType();
// Build a list of distinct groups, partitions and aggregate
// functions.
final Multimap<WindowKey, RexOver> windowMap =
LinkedListMultimap.create();
final int inputFieldCount = child.getRowType().getFieldCount();
final Map<RexLiteral, RexInputRef> constantPool = new HashMap<>();
final List<RexLiteral> constants = new ArrayList<>();
// Identify constants in the expression tree and replace them with
// references to newly generated constant pool.
RexShuttle replaceConstants = new RexShuttle() {
@Override public RexNode visitLiteral(RexLiteral literal) {
RexInputRef ref = constantPool.get(literal);
if (ref != null) {
return ref;
}
constants.add(literal);
ref =
new RexInputRef(constantPool.size() + inputFieldCount,
literal.getType());
constantPool.put(literal, ref);
return ref;
}
};
// Build a list of groups, partitions, and aggregate functions. Each
// aggregate function will add its arguments as outputs of the input
// program.
final IdentityHashMap<RexOver, RexOver> origToNewOver = new IdentityHashMap<>();
for (RexNode agg : program.getExprList()) {
if (agg instanceof RexOver) {
final RexOver origOver = (RexOver) agg;
final RexOver newOver = (RexOver) origOver.accept(replaceConstants);
origToNewOver.put(origOver, newOver);
addWindows(windowMap, newOver, inputFieldCount);
}
}
final Map<RexOver, Window.RexWinAggCall> aggMap = new HashMap<>();
List<Group> groups = new ArrayList<>();
for (Map.Entry<WindowKey, Collection<RexOver>> entry
: windowMap.asMap().entrySet()) {
final WindowKey windowKey = entry.getKey();
final List<RexWinAggCall> aggCalls = new ArrayList<>();
for (RexOver over : entry.getValue()) {
final RexWinAggCall aggCall =
new RexWinAggCall(
over.getAggOperator(),
over.getType(),
toInputRefs(over.operands),
aggMap.size(),
over.isDistinct(),
over.ignoreNulls());
aggCalls.add(aggCall);
aggMap.put(over, aggCall);
}
RexShuttle toInputRefs = new RexShuttle() {
@Override public RexNode visitLocalRef(RexLocalRef localRef) {
return new RexInputRef(localRef.getIndex(), localRef.getType());
}
};
groups.add(
new Group(
windowKey.groupSet,
windowKey.isRows,
windowKey.lowerBound.accept(toInputRefs),
windowKey.upperBound.accept(toInputRefs),
windowKey.exclude,
windowKey.orderKeys,
aggCalls));
}
// Figure out the type of the inputs to the output program.
// They are: the inputs to this rel, followed by the outputs of
// each window.
final List<Window.RexWinAggCall> flattenedAggCallList = new ArrayList<>();
final List<Map.Entry<String, RelDataType>> fieldList =
new ArrayList<>(child.getRowType().getFieldList());
final int offset = fieldList.size();
// Use better field names for agg calls that are projected.
final Map<Integer, String> fieldNames = new HashMap<>();
for (Ord<RexLocalRef> ref : Ord.zip(program.getProjectList())) {
final int index = ref.e.getIndex();
if (index >= offset) {
fieldNames.put(
index - offset, outRowType.getFieldNames().get(ref.i));
}
}
for (Ord<Group> window : Ord.zip(groups)) {
for (Ord<RexWinAggCall> over : Ord.zip(window.e.aggCalls)) {
// Add the k-th over expression of
// the i-th window to the output of the program.
String name = fieldNames.get(over.i);
if (name == null || name.startsWith("$")) {
name = "w" + window.i + "$o" + over.i;
}
fieldList.add(Pair.of(name, over.e.getType()));
flattenedAggCallList.add(over.e);
}
}
final RelDataType intermediateRowType =
cluster.getTypeFactory().createStructType(fieldList);
// The output program is the windowed agg's program, combined with
// the output calc (if it exists).
RexShuttle shuttle =
new RexShuttle() {
@Override public RexNode visitOver(RexOver over) {
// Look up the aggCall which this expr was translated to.
final Window.RexWinAggCall aggCall =
requireNonNull(aggMap.get(origToNewOver.get(over)));
assert RelOptUtil.eq(
"over",
over.getType(),
"aggCall",
aggCall.getType(),
Litmus.THROW);
// Find the index of the aggCall among all partitions of all
// groups.
final int aggCallIndex =
flattenedAggCallList.indexOf(aggCall);
assert aggCallIndex >= 0;
// Replace expression with a reference to the window slot.
final int index = inputFieldCount + aggCallIndex;
assert RelOptUtil.eq(
"over",
over.getType(),
"intermed",
intermediateRowType.getFieldList().get(index).getType(),
Litmus.THROW);
return new RexInputRef(
index,
over.getType());
}
@Override public RexNode visitLocalRef(RexLocalRef localRef) {
final int index = localRef.getIndex();
if (index < inputFieldCount) {
// Reference to input field.
return localRef;
}
return new RexLocalRef(
flattenedAggCallList.size() + index,
localRef.getType());
}
};
final LogicalWindow window =
LogicalWindow.create(traitSet, child, constants, intermediateRowType,
groups);
// The order that the "over" calls occur in the groups and
// partitions may not match the order in which they occurred in the
// original expression.
// Add a project to permute them.
final List<RexNode> refToWindow =
toInputRefs(shuttle.visitList(program.getExprList()));
final List<RexNode> projectList = new ArrayList<>();
for (RexLocalRef inputRef : program.getProjectList()) {
final int index = inputRef.getIndex();
final RexInputRef ref = (RexInputRef) refToWindow.get(index);
projectList.add(ref);
}
return relBuilder.push(window)
.project(projectList, outRowType.getFieldNames())
.build();
}