in asterix-bad/src/main/java/org/apache/asterix/bad/runtime/operators/NotifyBrokerRuntime.java [93:121]
public NotifyBrokerRuntime(IHyracksTaskContext ctx, IScalarEvaluatorFactory brokerEvalFactory,
IScalarEvaluatorFactory pushListEvalFactory, IScalarEvaluatorFactory channelExecutionEvalFactory,
IScalarEvaluatorFactory brokerTypeEvalFactory, EntityId activeJobId, boolean push, IAType recordType)
throws HyracksDataException {
this.tRef = new FrameTupleReference();
IEvaluatorContext evalCtx = new EvaluatorContext(ctx);
eval0 = brokerEvalFactory.createScalarEvaluator(evalCtx);
eval1 = pushListEvalFactory.createScalarEvaluator(evalCtx);
eval2 = channelExecutionEvalFactory.createScalarEvaluator(evalCtx);
eval3 = brokerTypeEvalFactory.createScalarEvaluator(evalCtx);
this.entityId = activeJobId;
this.push = push;
if (push) {
//for push-based channel, the recordType is the result record type (records are sent directly)
jsonRecordPrinter =
new org.apache.asterix.dataflow.data.nontagged.printers.json.clean.ARecordPrinterFactory(
(ARecordType) recordType).createPrinter();
admRecordPrinter = new org.apache.asterix.dataflow.data.nontagged.printers.adm.ARecordPrinterFactory(
(ARecordType) recordType).createPrinter();
} else {
//for pull-based channels, the recordType is a list of subscription ids
//the subscriptionIdListPrinterFactory is used instead
jsonRecordPrinter = null;
admRecordPrinter = null;
}
subscriptionIdListPrinterFactory =
new AOrderedlistPrinterFactory(new AOrderedListType(BuiltinType.AUUID, null)).createPrinter();
}