public IScalarEvaluatorFactory createEvaluatorFactory()

in asterix-bad/src/main/java/org/apache/asterix/bad/function/runtime/PreviousChannelTimeDescriptor.java [56:106]


    public IScalarEvaluatorFactory createEvaluatorFactory(IScalarEvaluatorFactory[] args) {
        return new IScalarEvaluatorFactory() {

            private static final long serialVersionUID = 1L;

            @Override
            public IScalarEvaluator createScalarEvaluator(IEvaluatorContext ctx) throws HyracksDataException {
                return new IScalarEvaluator() {

                    private ArrayBackedValueStorage resultStorage = new ArrayBackedValueStorage();
                    private DataOutput out = resultStorage.getDataOutput();
                    private final IPointable argPtr0 = new VoidPointable();
                    private final IScalarEvaluator eval0 = args[0].createScalarEvaluator(ctx);

                    @SuppressWarnings("unchecked")
                    private ISerializerDeserializer<ADateTime> datetimeSerde =
                            SerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.ADATETIME);
                    private AMutableDateTime aDateTime = new AMutableDateTime(0);
                    private IHyracksJobletContext jobletCtx = ctx.getTaskContext().getJobletContext();

                    @Override
                    public void evaluate(IFrameTupleReference tuple, IPointable result) throws HyracksDataException {

                        ActiveTimestampState existingState =
                                (ActiveTimestampState) ctx.getTaskContext().getStateObject(ActiveStateKey);
                        resultStorage.reset();

                        if (existingState == null) {
                            eval0.evaluate(tuple, argPtr0);
                            // offset the type
                            String channelName = new String(argPtr0.getByteArray(), argPtr0.getStartOffset() + 2,
                                    argPtr0.getLength() - 2);
                            String nodeId = jobletCtx.getServiceContext().getNodeId();
                            ActiveTimestampManager.progressChannelExecutionTimestamps(jobletCtx.getJobId(), channelName,
                                    nodeId);
                            long previousChannelTime =
                                    ActiveTimestampManager.getPreviousChannelExecutionTimestamp(channelName, nodeId);
                            long currentChannelTime =
                                    ActiveTimestampManager.getCurrentChannelExecutionTimestamp(channelName, nodeId);
                            existingState = new ActiveTimestampState(jobletCtx.getJobId(), ActiveStateKey);
                            existingState.setExecutionTime(previousChannelTime, currentChannelTime);
                            ctx.getTaskContext().setStateObject(existingState);
                        }
                        aDateTime.setValue(existingState.getPreviousChannelExecutionTime());
                        datetimeSerde.serialize(aDateTime, out);
                        result.set(resultStorage);
                    }
                };
            }
        };
    }