public IAggregatorDescriptor createAggregator()

in algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggreg/SerializableAggregatorDescriptorFactory.java [45:166]


    public IAggregatorDescriptor createAggregator(IHyracksTaskContext ctx, RecordDescriptor inRecordDescriptor,
            RecordDescriptor outRecordDescriptor, int[] keyFields, final int[] keyFieldsInPartialResults)
                    throws HyracksDataException {
        final int[] keys = keyFields;

        /**
         * one IAggregatorDescriptor instance per Gby operator
         */
        return new IAggregatorDescriptor() {
            private FrameTupleReference ftr = new FrameTupleReference();
            private ISerializedAggregateEvaluator[] aggs = new ISerializedAggregateEvaluator[aggFactories.length];
            private int offsetFieldIndex = keys.length;
            private int stateFieldLength[] = new int[aggFactories.length];

            @Override
            public AggregateState createAggregateStates() {
                return new AggregateState();
            }

            @Override
            public void init(ArrayTupleBuilder tb, IFrameTupleAccessor accessor, int tIndex, AggregateState state)
                    throws HyracksDataException {
                DataOutput output = tb.getDataOutput();
                ftr.reset(accessor, tIndex);
                for (int i = 0; i < aggs.length; i++) {
                    try {
                        int begin = tb.getSize();
                        if (aggs[i] == null) {
                            aggs[i] = aggFactories[i].createAggregateEvaluator(ctx);
                        }
                        aggs[i].init(output);
                        tb.addFieldEndOffset();
                        stateFieldLength[i] = tb.getSize() - begin;
                    } catch (AlgebricksException e) {
                        throw new HyracksDataException(e);
                    }
                }

                // doing initial aggregate
                ftr.reset(accessor, tIndex);
                for (int i = 0; i < aggs.length; i++) {
                    try {
                        byte[] data = tb.getByteArray();
                        int prevFieldPos = i + keys.length - 1;
                        int start = prevFieldPos >= 0 ? tb.getFieldEndOffsets()[prevFieldPos] : 0;
                        aggs[i].step(ftr, data, start, stateFieldLength[i]);
                    } catch (AlgebricksException e) {
                        throw new HyracksDataException(e);
                    }
                }
            }

            @Override
            public void aggregate(IFrameTupleAccessor accessor, int tIndex, IFrameTupleAccessor stateAccessor,
                    int stateTupleIndex, AggregateState state) throws HyracksDataException {
                ftr.reset(accessor, tIndex);
                int stateTupleStart = stateAccessor.getTupleStartOffset(stateTupleIndex);
                int fieldSlotLength = stateAccessor.getFieldSlotsLength();
                for (int i = 0; i < aggs.length; i++) {
                    try {
                        byte[] data = stateAccessor.getBuffer().array();
                        int start = stateAccessor.getFieldStartOffset(stateTupleIndex, i + keys.length)
                                + stateTupleStart + fieldSlotLength;
                        aggs[i].step(ftr, data, start, stateFieldLength[i]);
                    } catch (AlgebricksException e) {
                        throw new HyracksDataException(e);
                    }
                }
            }

            @Override
            public boolean outputPartialResult(ArrayTupleBuilder tb, IFrameTupleAccessor stateAccessor, int tIndex,
                    AggregateState state) throws HyracksDataException {
                byte[] data = stateAccessor.getBuffer().array();
                int startOffset = stateAccessor.getTupleStartOffset(tIndex);
                int aggFieldOffset = stateAccessor.getFieldStartOffset(tIndex, offsetFieldIndex);
                int refOffset = startOffset + stateAccessor.getFieldSlotsLength() + aggFieldOffset;
                int start = refOffset;
                for (int i = 0; i < aggs.length; i++) {
                    try {
                        aggs[i].finishPartial(data, start, stateFieldLength[i], tb.getDataOutput());
                        start += stateFieldLength[i];
                        tb.addFieldEndOffset();
                    } catch (AlgebricksException e) {
                        throw new HyracksDataException(e);
                    }
                }
                return true;
            }

            @Override
            public boolean outputFinalResult(ArrayTupleBuilder tb, IFrameTupleAccessor stateAccessor, int tIndex,
                    AggregateState state) throws HyracksDataException {
                byte[] data = stateAccessor.getBuffer().array();
                int startOffset = stateAccessor.getTupleStartOffset(tIndex);
                int aggFieldOffset = stateAccessor.getFieldStartOffset(tIndex, offsetFieldIndex);
                int refOffset = startOffset + stateAccessor.getFieldSlotsLength() + aggFieldOffset;
                int start = refOffset;
                for (int i = 0; i < aggs.length; i++) {
                    try {
                        aggs[i].finish(data, start, stateFieldLength[i], tb.getDataOutput());
                        start += stateFieldLength[i];
                        tb.addFieldEndOffset();
                    } catch (AlgebricksException e) {
                        throw new HyracksDataException(e);
                    }
                }
                return true;
            }

            @Override
            public void reset() {

            }

            @Override
            public void close() {
                reset();
            }

        };
    }