public IFieldAggregateDescriptor createAggregator()

in hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/aggregators/AvgFieldMergeAggregatorFactory.java [61:173]


    public IFieldAggregateDescriptor createAggregator(IHyracksTaskContext ctx, RecordDescriptor inRecordDescriptor,
            RecordDescriptor outRecordDescriptor) throws HyracksDataException {

        return new IFieldAggregateDescriptor() {

            @Override
            public void reset() {
            }

            @Override
            public void outputPartialResult(DataOutput fieldOutput, byte[] data, int offset, AggregateState state)
                    throws HyracksDataException {
                int sum, count;
                if (!useObjectState) {
                    sum = IntegerPointable.getInteger(data, offset);
                    count = IntegerPointable.getInteger(data, offset + 4);
                } else {
                    Integer[] fields = (Integer[]) state.state;
                    sum = fields[0];
                    count = fields[1];
                }
                try {
                    fieldOutput.writeInt(sum);
                    fieldOutput.writeInt(count);
                } catch (IOException e) {
                    throw new HyracksDataException("I/O exception when writing aggregation to the output buffer.");
                }
            }

            @Override
            public void outputFinalResult(DataOutput fieldOutput, byte[] data, int offset, AggregateState state)
                    throws HyracksDataException {
                int sum, count;
                if (!useObjectState) {
                    sum = IntegerPointable.getInteger(data, offset);
                    count = IntegerPointable.getInteger(data, offset + 4);
                } else {
                    Integer[] fields = (Integer[]) state.state;
                    sum = fields[0];
                    count = fields[1];
                }
                try {
                    fieldOutput.writeFloat((float) sum / count);
                } catch (IOException e) {
                    throw new HyracksDataException("I/O exception when writing aggregation to the output buffer.");
                }
            }

            @Override
            public void close() {
                // TODO Auto-generated method stub

            }

            @Override
            public void aggregate(IFrameTupleAccessor accessor, int tIndex, byte[] data, int offset,
                    AggregateState state) throws HyracksDataException {
                int sum = 0, count = 0;
                int tupleOffset = accessor.getTupleStartOffset(tIndex);
                int fieldStart = accessor.getFieldStartOffset(tIndex, aggField);
                sum += IntegerPointable.getInteger(accessor.getBuffer().array(), tupleOffset + accessor.getFieldSlotsLength() + fieldStart);
                count += 1;
                if (!useObjectState) {
                    ByteBuffer buf = ByteBuffer.wrap(data);
                    sum += buf.getInt(offset);
                    count += buf.getInt(offset + 4);
                    buf.putInt(offset, sum);
                    buf.putInt(offset + 4, count);
                } else {
                    Integer[] fields = (Integer[]) state.state;
                    sum += fields[0];
                    count += fields[1];
                    state.state = new Integer[] { sum, count };
                }
            }

            @Override
            public boolean needsObjectState() {
                return useObjectState;
            }

            @Override
            public boolean needsBinaryState() {
                return !useObjectState;
            }

            @Override
            public AggregateState createState() {
                return new AggregateState(new Integer[] { 0, 0 });
            }

            @Override
            public void init(IFrameTupleAccessor accessor, int tIndex, DataOutput fieldOutput, AggregateState state)
                    throws HyracksDataException {
                int sum = 0;
                int count = 0;
                int tupleOffset = accessor.getTupleStartOffset(tIndex);
                int fieldStart = accessor.getFieldStartOffset(tIndex, aggField);
                sum += IntegerPointable.getInteger(accessor.getBuffer().array(), tupleOffset + accessor.getFieldSlotsLength() + fieldStart);
                count += IntegerPointable.getInteger(accessor.getBuffer().array(), tupleOffset + accessor.getFieldSlotsLength() + fieldStart + 4);
                if (!useObjectState) {
                    try {
                        fieldOutput.writeInt(sum);
                        fieldOutput.writeInt(count);
                    } catch (IOException e) {
                        throw new HyracksDataException("I/O exception when initializing the aggregator.");
                    }
                } else {
                    state.state = new Integer[] { sum, count };
                }
            }
        };
    }