in asterix-bad/src/main/java/org/apache/asterix/bad/rules/InsertBrokerNotifierForChannelRule.java [300:338]
private AssignOperator createbrokerEndPointAssignOperator(LogicalVariable brokerEndpointVar,
LogicalVariable brokerTypeVar, AbstractLogicalOperator opAboveBrokersScan) {
Mutable<ILogicalExpression> endpointFieldName = new MutableObject<ILogicalExpression>(new ConstantExpression(
new AsterixConstantValue(new AString(BADConstants.METADATA_TYPE_FIELD_NAME_BROKER_END_POINT))));
Mutable<ILogicalExpression> brokerTypeFieldName = new MutableObject<ILogicalExpression>(new ConstantExpression(
new AsterixConstantValue(new AString(BADConstants.METADATA_TYPE_FIELD_NAME_BROKER_TYPE))));
DataSourceScanOperator brokerScan = null;
int index = 0;
for (Mutable<ILogicalOperator> subOp : opAboveBrokersScan.getInputs()) {
if (isBrokerScan((AbstractLogicalOperator) subOp.getValue())) {
brokerScan = (DataSourceScanOperator) subOp.getValue();
break;
}
index++;
}
Mutable<ILogicalExpression> varRef = new MutableObject<ILogicalExpression>(
new VariableReferenceExpression(brokerScan.getVariables().get(2)));
ScalarFunctionCallExpression brokerEndpointFieldAccessor = new ScalarFunctionCallExpression(
FunctionUtil.getFunctionInfo(BuiltinFunctions.FIELD_ACCESS_BY_NAME), varRef, endpointFieldName);
ScalarFunctionCallExpression brokerTYpeFieldAccessor = new ScalarFunctionCallExpression(
FunctionUtil.getFunctionInfo(BuiltinFunctions.FIELD_ACCESS_BY_NAME), varRef, brokerTypeFieldName);
ArrayList<LogicalVariable> varArray = new ArrayList<LogicalVariable>(2);
varArray.add(brokerEndpointVar);
varArray.add(brokerTypeVar);
ArrayList<Mutable<ILogicalExpression>> exprArray = new ArrayList<Mutable<ILogicalExpression>>(2);
exprArray.add(new MutableObject<>(brokerEndpointFieldAccessor));
exprArray.add(new MutableObject<>(brokerTYpeFieldAccessor));
AssignOperator assignOp = new AssignOperator(varArray, exprArray);
//Place assignOp between the scan and the op above it
assignOp.getInputs().add(new MutableObject<>(brokerScan));
opAboveBrokersScan.getInputs().set(index, new MutableObject<>(assignOp));
return assignOp;
}