public IAggregatorDescriptor createAggregator()

in hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/aggregators/MultiFieldsAggregatorFactory.java [60:172]


    public IAggregatorDescriptor createAggregator(IHyracksTaskContext ctx, RecordDescriptor inRecordDescriptor,
            RecordDescriptor outRecordDescriptor, final int[] keyFields, final int[] keyFieldsInPartialResults)
            throws HyracksDataException {

        final IFieldAggregateDescriptor[] aggregators = new IFieldAggregateDescriptor[aggregatorFactories.length];
        for (int i = 0; i < aggregators.length; i++) {
            aggregators[i] = aggregatorFactories[i].createAggregator(ctx, inRecordDescriptor, outRecordDescriptor);
        }

        if (this.keys == null) {
            this.keys = keyFields;
        }

        return new IAggregatorDescriptor() {

            @Override
            public void reset() {
                for (int i = 0; i < aggregators.length; i++) {
                    aggregators[i].reset();
                }
            }

            @Override
            public boolean outputPartialResult(ArrayTupleBuilder tupleBuilder, IFrameTupleAccessor stateAccessor,
                    int tIndex, AggregateState state) throws HyracksDataException {
                DataOutput dos = tupleBuilder.getDataOutput();

                int tupleOffset = stateAccessor.getTupleStartOffset(tIndex);
                for (int i = 0; i < aggregators.length; i++) {
                    int fieldOffset = stateAccessor.getFieldStartOffset(tIndex, keys.length + i);
                    aggregators[i].outputPartialResult(dos, stateAccessor.getBuffer().array(),
                            fieldOffset + stateAccessor.getFieldSlotsLength() + tupleOffset,
                            ((AggregateState[]) state.state)[i]);
                    tupleBuilder.addFieldEndOffset();
                }
                return true;
            }

            @Override
            public boolean outputFinalResult(ArrayTupleBuilder tupleBuilder, IFrameTupleAccessor stateAccessor, int tIndex,
                    AggregateState state) throws HyracksDataException {
                DataOutput dos = tupleBuilder.getDataOutput();

                int tupleOffset = stateAccessor.getTupleStartOffset(tIndex);
                for (int i = 0; i < aggregators.length; i++) {
                    if (aggregators[i].needsBinaryState()) {
                        int fieldOffset = stateAccessor.getFieldStartOffset(tIndex, keys.length + i);
                        aggregators[i].outputFinalResult(dos, stateAccessor.getBuffer().array(),
                                tupleOffset + stateAccessor.getFieldSlotsLength() + fieldOffset,
                                ((AggregateState[]) state.state)[i]);
                    } else {
                        aggregators[i].outputFinalResult(dos, null, 0, ((AggregateState[]) state.state)[i]);
                    }
                    tupleBuilder.addFieldEndOffset();
                }
                return true;
            }

            @Override
            public void init(ArrayTupleBuilder tupleBuilder, IFrameTupleAccessor accessor, int tIndex,
                    AggregateState state) throws HyracksDataException {
                DataOutput dos = tupleBuilder.getDataOutput();

                for (int i = 0; i < aggregators.length; i++) {
                    aggregators[i].init(accessor, tIndex, dos, ((AggregateState[]) state.state)[i]);
                    if (aggregators[i].needsBinaryState()) {
                        tupleBuilder.addFieldEndOffset();
                    }
                }
            }

            @Override
            public AggregateState createAggregateStates() {
                AggregateState[] states = new AggregateState[aggregators.length];
                for (int i = 0; i < states.length; i++) {
                    states[i] = aggregators[i].createState();
                }
                return new AggregateState(states);
            }

            @Override
            public void close() {
                for (int i = 0; i < aggregators.length; i++) {
                    aggregators[i].close();
                }
            }

            @Override
            public void aggregate(IFrameTupleAccessor accessor, int tIndex, IFrameTupleAccessor stateAccessor,
                    int stateTupleIndex, AggregateState state) throws HyracksDataException {
                if (stateAccessor != null) {
                    int stateTupleOffset = stateAccessor.getTupleStartOffset(stateTupleIndex);
                    int fieldIndex = 0;
                    for (int i = 0; i < aggregators.length; i++) {
                        if (aggregators[i].needsBinaryState()) {
                            int stateFieldOffset = stateAccessor.getFieldStartOffset(stateTupleIndex, keys.length
                                    + fieldIndex);
                            aggregators[i].aggregate(accessor, tIndex, stateAccessor.getBuffer().array(),
                                    stateTupleOffset + stateAccessor.getFieldSlotsLength() + stateFieldOffset,
                                    ((AggregateState[]) state.state)[i]);
                            fieldIndex++;
                        } else {
                            aggregators[i].aggregate(accessor, tIndex, null, 0, ((AggregateState[]) state.state)[i]);
                        }
                    }
                } else {
                    for (int i = 0; i < aggregators.length; i++) {
                        aggregators[i].aggregate(accessor, tIndex, null, 0, ((AggregateState[]) state.state)[i]);
                    }
                }
            }
        };
    }