in interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/ir/runtime/proto/GraphRelToProtoConverter.java [547:700]
public RelNode visit(GraphLogicalAggregate aggregate) {
visitChildren(aggregate);
List<RelDataTypeField> fields = aggregate.getRowType().getFieldList();
List<GraphAggCall> groupCalls = aggregate.getAggCalls();
GraphGroupKeys keys = aggregate.getGroupKey();
if (groupCalls.isEmpty()) { // transform to project + dedup by keys
Preconditions.checkArgument(
keys.groupKeyCount() > 0,
"group keys should not be empty while group calls is empty");
GraphAlgebraPhysical.Project.Builder projectBuilder =
GraphAlgebraPhysical.Project.newBuilder();
for (int i = 0; i < keys.groupKeyCount(); ++i) {
RexNode var = keys.getVariables().get(i);
Preconditions.checkArgument(
var instanceof RexGraphVariable,
"each group key should be type %s, but is %s",
RexGraphVariable.class,
var.getClass());
OuterExpression.Expression expr =
var.accept(new RexToProtoConverter(true, isColumnId, this.rexBuilder));
int aliasId;
if (i >= fields.size()
|| (aliasId = fields.get(i).getIndex()) == AliasInference.DEFAULT_ID) {
throw new IllegalArgumentException(
"each group key should have an alias if need dedup");
}
GraphAlgebraPhysical.Project.ExprAlias.Builder projectExprAliasBuilder =
GraphAlgebraPhysical.Project.ExprAlias.newBuilder();
projectExprAliasBuilder.setExpr(expr);
if (aliasId != AliasInference.DEFAULT_ID) {
projectExprAliasBuilder.setAlias(Utils.asAliasId(aliasId));
}
projectBuilder.addMappings(projectExprAliasBuilder.build());
}
GraphAlgebra.Dedup.Builder dedupBuilder = GraphAlgebra.Dedup.newBuilder();
for (int i = 0; i < keys.groupKeyCount(); ++i) {
RelDataTypeField field = fields.get(i);
RexVariable rexVar =
RexGraphVariable.of(
field.getIndex(),
AliasInference.DEFAULT_COLUMN_ID,
field.getName(),
field.getType());
OuterExpression.Variable exprVar =
rexVar.accept(new RexToProtoConverter(true, isColumnId, this.rexBuilder))
.getOperators(0)
.getVar();
dedupBuilder.addKeys(exprVar);
}
GraphAlgebraPhysical.PhysicalOpr.Builder projectOprBuilder =
GraphAlgebraPhysical.PhysicalOpr.newBuilder();
projectOprBuilder.setOpr(
GraphAlgebraPhysical.PhysicalOpr.Operator.newBuilder()
.setProject(projectBuilder));
GraphAlgebraPhysical.PhysicalOpr.Builder dedupOprBuilder =
GraphAlgebraPhysical.PhysicalOpr.newBuilder();
dedupOprBuilder.setOpr(
GraphAlgebraPhysical.PhysicalOpr.Operator.newBuilder().setDedup(dedupBuilder));
if (isPartitioned) {
Map<Integer, Set<GraphNameOrId>> tagColumns =
Utils.extractTagColumnsFromRexNodes(keys.getVariables());
if (preCacheEdgeProps) {
Utils.removeEdgeProperties(
com.alibaba.graphscope.common.ir.tools.Utils.getOutputType(
aggregate.getInput()),
tagColumns);
}
lazyPropertyFetching(tagColumns);
}
physicalBuilder.addPlan(projectOprBuilder.build());
physicalBuilder.addPlan(dedupOprBuilder.build());
} else {
GraphAlgebraPhysical.PhysicalOpr.Builder oprBuilder =
GraphAlgebraPhysical.PhysicalOpr.newBuilder();
GraphAlgebraPhysical.GroupBy.Builder groupByBuilder =
GraphAlgebraPhysical.GroupBy.newBuilder();
for (int i = 0; i < keys.groupKeyCount(); ++i) {
RexNode var = keys.getVariables().get(i);
Preconditions.checkArgument(
var instanceof RexGraphVariable,
"each group key should be type %s, but is %s",
RexGraphVariable.class,
var.getClass());
OuterExpression.Variable exprVar =
var.accept(new RexToProtoConverter(true, isColumnId, this.rexBuilder))
.getOperators(0)
.getVar();
int aliasId = fields.get(i).getIndex();
GraphAlgebraPhysical.GroupBy.KeyAlias.Builder keyAliasBuilder =
GraphAlgebraPhysical.GroupBy.KeyAlias.newBuilder();
keyAliasBuilder.setKey(exprVar);
if (aliasId != AliasInference.DEFAULT_ID) {
keyAliasBuilder.setAlias(Utils.asAliasId(aliasId));
}
groupByBuilder.addMappings(keyAliasBuilder);
}
for (int i = 0; i < groupCalls.size(); ++i) {
List<RexNode> operands = groupCalls.get(i).getOperands();
if (operands.isEmpty()) {
throw new IllegalArgumentException(
"operands in aggregate call should not be empty");
}
GraphAlgebraPhysical.GroupBy.AggFunc.Builder aggFnAliasBuilder =
GraphAlgebraPhysical.GroupBy.AggFunc.newBuilder();
for (RexNode operand : operands) {
Preconditions.checkArgument(
operand instanceof RexGraphVariable,
"each expression in aggregate call should be type %s, but is %s",
RexGraphVariable.class,
operand.getClass());
OuterExpression.Variable var =
operand.accept(
new RexToProtoConverter(
true, isColumnId, this.rexBuilder))
.getOperators(0)
.getVar();
aggFnAliasBuilder.addVars(var);
}
GraphAlgebraPhysical.GroupBy.AggFunc.Aggregate aggOpt =
Utils.protoAggOpt(groupCalls.get(i));
aggFnAliasBuilder.setAggregate(aggOpt);
int aliasId = fields.get(i + keys.groupKeyCount()).getIndex();
if (aliasId != AliasInference.DEFAULT_ID) {
aggFnAliasBuilder.setAlias(Utils.asAliasId(aliasId));
}
groupByBuilder.addFunctions(aggFnAliasBuilder);
}
oprBuilder.setOpr(
GraphAlgebraPhysical.PhysicalOpr.Operator.newBuilder()
.setGroupBy(groupByBuilder));
oprBuilder.addAllMetaData(
Utils.physicalProtoRowType(aggregate.getRowType(), isColumnId));
if (isPartitioned) {
List<RexNode> keysAndAggs = Lists.newArrayList();
keysAndAggs.addAll(keys.getVariables());
keysAndAggs.addAll(
groupCalls.stream()
.flatMap(k -> k.getOperands().stream())
.collect(Collectors.toList()));
Map<Integer, Set<GraphNameOrId>> tagColumns =
Utils.extractTagColumnsFromRexNodes(keysAndAggs);
if (preCacheEdgeProps) {
Utils.removeEdgeProperties(
com.alibaba.graphscope.common.ir.tools.Utils.getOutputType(
aggregate.getInput()),
tagColumns);
}
lazyPropertyFetching(tagColumns);
}
physicalBuilder.addPlan(oprBuilder.build());
}
return aggregate;
}