in asterix-bad/src/main/java/org/apache/asterix/bad/rules/InsertBrokerNotifierForChannelRule.java [241:298]
private DelegateOperator createNotifyBrokerPullPlan(LogicalVariable brokerEndpointVar, LogicalVariable sendVar,
LogicalVariable channelExecutionVar, LogicalVariable brokerTypeVar, IOptimizationContext context,
ILogicalOperator eOp, DistributeResultOperator distributeOp, DataverseName channelDataverse,
String channelName) throws AlgebricksException {
//Create the Distinct Op
ArrayList<Mutable<ILogicalExpression>> expressions = new ArrayList<>();
VariableReferenceExpression vExpr = new VariableReferenceExpression(sendVar);
expressions.add(new MutableObject<>(vExpr));
DistinctOperator distinctOp = new DistinctOperator(expressions);
List<Pair<LogicalVariable, Mutable<ILogicalExpression>>> groupByList = new ArrayList<>();
List<Pair<LogicalVariable, Mutable<ILogicalExpression>>> groupByDecorList = new ArrayList<>();
List<ILogicalPlan> nestedPlans = new ArrayList<>();
//Create GroupBy operator
GroupByOperator groupbyOp = new GroupByOperator(groupByList, groupByDecorList, nestedPlans);
groupbyOp.addGbyExpression(null, new VariableReferenceExpression(brokerEndpointVar));
groupbyOp.addGbyExpression(null, new VariableReferenceExpression(brokerTypeVar));
groupbyOp.addGbyExpression(null, new VariableReferenceExpression(channelExecutionVar));
//Set the distinct as input
groupbyOp.getInputs().add(new MutableObject<>(distinctOp));
//create nested plan for subscription ids in group by
NestedTupleSourceOperator nestedTupleSourceOp = new NestedTupleSourceOperator(new MutableObject<>(groupbyOp));
LogicalVariable sendListVar = context.newVar();
List<LogicalVariable> aggVars = new ArrayList<>();
aggVars.add(sendListVar);
AggregateFunctionCallExpression funAgg =
BuiltinFunctions.makeAggregateFunctionExpression(BuiltinFunctions.LISTIFY, new ArrayList<>());
funAgg.getArguments().add(new MutableObject<>(new VariableReferenceExpression(sendVar)));
List<Mutable<ILogicalExpression>> aggExpressions = new ArrayList<>();
aggExpressions.add(new MutableObject<>(funAgg));
AggregateOperator listifyOp = new AggregateOperator(aggVars, aggExpressions);
listifyOp.getInputs().add(new MutableObject<>(nestedTupleSourceOp));
//add nested plans
nestedPlans.add(new ALogicalPlanImpl(new MutableObject<>(listifyOp)));
//Create the NotifyBrokerOperator
DelegateOperator extensionOp = createBrokerOp(brokerEndpointVar, sendListVar, channelExecutionVar,
brokerTypeVar, channelDataverse, channelName, false);
//Set the input for the distinct as the old top
extensionOp.getInputs().add(new MutableObject<>(groupbyOp));
distinctOp.getInputs().add(new MutableObject<>(eOp));
//compute environment bottom up
context.computeAndSetTypeEnvironmentForOperator(distinctOp);
context.computeAndSetTypeEnvironmentForOperator(nestedTupleSourceOp);
context.computeAndSetTypeEnvironmentForOperator(listifyOp);
context.computeAndSetTypeEnvironmentForOperator(groupbyOp);
context.computeAndSetTypeEnvironmentForOperator(extensionOp);
return extensionOp;
}