public void contributeRuntimeOperator()

in asterix-bad/src/main/java/org/apache/asterix/bad/runtime/operators/NotifyBrokerPOperator.java [73:108]


    public void contributeRuntimeOperator(IHyracksJobBuilder builder, JobGenContext context, ILogicalOperator op,
            IOperatorSchema propagatedSchema, IOperatorSchema[] inputSchemas, IOperatorSchema outerPlanSchema)
            throws AlgebricksException {
        DelegateOperator notify = (DelegateOperator) op;
        LogicalVariable pushListVar = ((NotifyBrokerOperator) notify.getDelegate()).getPushListVar();
        LogicalVariable brokerVar = ((NotifyBrokerOperator) notify.getDelegate()).getBrokerEndpointVariable();
        LogicalVariable executionVar = ((NotifyBrokerOperator) notify.getDelegate()).getChannelExecutionVariable();
        LogicalVariable brokerTypeVar = ((NotifyBrokerOperator) notify.getDelegate()).getBrokerTypeVar();

        IVariableTypeEnvironment env = context.getTypeEnvironment(op.getInputs().get(0).getValue());
        IAType recordType = (IAType) env.getVarType(pushListVar);

        boolean push = ((NotifyBrokerOperator) notify.getDelegate()).getPush();

        int brokerColumn = inputSchemas[0].findVariable(brokerVar);
        int pushColumn = inputSchemas[0].findVariable(pushListVar);
        int executionColumn = inputSchemas[0].findVariable(executionVar);
        int brokerTypeColumn = inputSchemas[0].findVariable(brokerTypeVar);

        IScalarEvaluatorFactory brokerEvalFactory = new ColumnAccessEvalFactory(brokerColumn);
        IScalarEvaluatorFactory pushListEvalFactory = new ColumnAccessEvalFactory(pushColumn);
        IScalarEvaluatorFactory channelExecutionEvalFactory = new ColumnAccessEvalFactory(executionColumn);
        IScalarEvaluatorFactory brokerTypeEvalFactory = new ColumnAccessEvalFactory(brokerTypeColumn);

        NotifyBrokerRuntimeFactory runtime = new NotifyBrokerRuntimeFactory(brokerEvalFactory, pushListEvalFactory,
                channelExecutionEvalFactory, brokerTypeEvalFactory, entityId, push, recordType);

        RecordDescriptor recDesc =
                JobGenHelper.mkRecordDescriptor(context.getTypeEnvironment(op), propagatedSchema, context);

        builder.contributeMicroOperator(op, runtime, recDesc);

        // and contribute one edge from its child
        ILogicalOperator src = op.getInputs().get(0).getValue();
        builder.contributeGraphEdge(src, 0, notify, 0);
    }