in asterix-bad/src/main/java/org/apache/asterix/bad/runtime/operators/NotifyBrokerPOperator.java [73:108]
public void contributeRuntimeOperator(IHyracksJobBuilder builder, JobGenContext context, ILogicalOperator op,
IOperatorSchema propagatedSchema, IOperatorSchema[] inputSchemas, IOperatorSchema outerPlanSchema)
throws AlgebricksException {
DelegateOperator notify = (DelegateOperator) op;
LogicalVariable pushListVar = ((NotifyBrokerOperator) notify.getDelegate()).getPushListVar();
LogicalVariable brokerVar = ((NotifyBrokerOperator) notify.getDelegate()).getBrokerEndpointVariable();
LogicalVariable executionVar = ((NotifyBrokerOperator) notify.getDelegate()).getChannelExecutionVariable();
LogicalVariable brokerTypeVar = ((NotifyBrokerOperator) notify.getDelegate()).getBrokerTypeVar();
IVariableTypeEnvironment env = context.getTypeEnvironment(op.getInputs().get(0).getValue());
IAType recordType = (IAType) env.getVarType(pushListVar);
boolean push = ((NotifyBrokerOperator) notify.getDelegate()).getPush();
int brokerColumn = inputSchemas[0].findVariable(brokerVar);
int pushColumn = inputSchemas[0].findVariable(pushListVar);
int executionColumn = inputSchemas[0].findVariable(executionVar);
int brokerTypeColumn = inputSchemas[0].findVariable(brokerTypeVar);
IScalarEvaluatorFactory brokerEvalFactory = new ColumnAccessEvalFactory(brokerColumn);
IScalarEvaluatorFactory pushListEvalFactory = new ColumnAccessEvalFactory(pushColumn);
IScalarEvaluatorFactory channelExecutionEvalFactory = new ColumnAccessEvalFactory(executionColumn);
IScalarEvaluatorFactory brokerTypeEvalFactory = new ColumnAccessEvalFactory(brokerTypeColumn);
NotifyBrokerRuntimeFactory runtime = new NotifyBrokerRuntimeFactory(brokerEvalFactory, pushListEvalFactory,
channelExecutionEvalFactory, brokerTypeEvalFactory, entityId, push, recordType);
RecordDescriptor recDesc =
JobGenHelper.mkRecordDescriptor(context.getTypeEnvironment(op), propagatedSchema, context);
builder.contributeMicroOperator(op, runtime, recDesc);
// and contribute one edge from its child
ILogicalOperator src = op.getInputs().get(0).getValue();
builder.contributeGraphEdge(src, 0, notify, 0);
}