public IFieldAggregateDescriptor createAggregator()

in hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/aggregators/MinMaxStringFieldAggregatorFactory.java [66:204]


    public IFieldAggregateDescriptor createAggregator(IHyracksTaskContext ctx, RecordDescriptor inRecordDescriptor,
            RecordDescriptor outRecordDescriptor) throws HyracksDataException {
        return new IFieldAggregateDescriptor() {

            UTF8StringSerializerDeserializer utf8SerializerDeserializer = new UTF8StringSerializerDeserializer();

            @Override
            public void reset() {
            }

            @Override
            public void outputPartialResult(DataOutput fieldOutput, byte[] data, int offset, AggregateState state)
                    throws HyracksDataException {
                try {
                    if (hasBinaryState) {
                        int stateIdx = IntegerPointable.getInteger(data, offset);
                        Object[] storedState = (Object[]) state.state;
                        fieldOutput.writeUTF((String) storedState[stateIdx]);
                    } else {
                        fieldOutput.writeUTF((String) state.state);
                    }
                } catch (IOException e) {
                    throw new HyracksDataException(
                            "I/O exception when writing a string to the output writer in MinMaxStringAggregatorFactory.");
                }
            }

            @Override
            public void outputFinalResult(DataOutput fieldOutput, byte[] data, int offset, AggregateState state)
                    throws HyracksDataException {
                try {
                    if (hasBinaryState) {
                        int stateIdx = IntegerPointable.getInteger(data, offset);
                        Object[] storedState = (Object[]) state.state;
                        fieldOutput.writeUTF((String) storedState[stateIdx]);
                    } else {
                        fieldOutput.writeUTF((String) state.state);
                    }
                } catch (IOException e) {
                    throw new HyracksDataException(
                            "I/O exception when writing a string to the output writer in MinMaxStringAggregatorFactory.");
                }
            }

            @Override
            public void init(IFrameTupleAccessor accessor, int tIndex, DataOutput fieldOutput, AggregateState state)
                    throws HyracksDataException {
                int tupleOffset = accessor.getTupleStartOffset(tIndex);
                int fieldStart = accessor.getFieldStartOffset(tIndex, aggField);
                int fieldLength = accessor.getFieldLength(tIndex, aggField);
                String strField = utf8SerializerDeserializer.deserialize(new DataInputStream(
                        new ByteArrayInputStream(accessor.getBuffer().array(), tupleOffset
                                + accessor.getFieldSlotsLength() + fieldStart, fieldLength)));
                if (hasBinaryState) {
                    // Object-binary-state
                    Object[] storedState;
                    if (state.state == null) {
                        storedState = new Object[8];
                        storedState[0] = new Integer(0);
                        state.state = storedState;
                    } else {
                        storedState = (Object[]) state.state;
                    }
                    int stateCount = (Integer) (storedState[0]);
                    if (stateCount + 1 >= storedState.length) {
                        storedState = Arrays.copyOf(storedState, storedState.length * 2);
                        state.state = storedState;
                    }

                    stateCount++;
                    storedState[0] = stateCount;
                    storedState[stateCount] = strField;
                    try {
                        fieldOutput.writeInt(stateCount);
                    } catch (IOException e) {
                        throw new HyracksDataException(e.fillInStackTrace());
                    }
                } else {
                    // Only object-state
                    state.state = strField;
                }
            }

            @Override
            public void close() {
                // TODO Auto-generated method stub

            }

            @Override
            public void aggregate(IFrameTupleAccessor accessor, int tIndex, byte[] data, int offset,
                    AggregateState state) throws HyracksDataException {
                int tupleOffset = accessor.getTupleStartOffset(tIndex);
                int fieldStart = accessor.getFieldStartOffset(tIndex, aggField);
                int fieldLength = accessor.getFieldLength(tIndex, aggField);
                String strField = utf8SerializerDeserializer.deserialize(new DataInputStream(
                        new ByteArrayInputStream(accessor.getBuffer().array(), tupleOffset
                                + accessor.getFieldSlotsLength() + fieldStart, fieldLength)));
                if (hasBinaryState) {
                    int stateIdx = IntegerPointable.getInteger(data, offset);

                    Object[] storedState = (Object[]) state.state;

                    if (isMax) {
                        if (strField.length() > ((String) (storedState[stateIdx])).length()) {
                            storedState[stateIdx] = strField;
                        }
                    } else {
                        if (strField.length() < ((String) (storedState[stateIdx])).length()) {
                            storedState[stateIdx] = strField;
                        }
                    }
                } else {
                    if (isMax) {
                        if (strField.length() > ((String) (state.state)).length()) {
                            state.state = strField;
                        }
                    } else {
                        if (strField.length() < ((String) (state.state)).length()) {
                            state.state = strField;
                        }
                    }
                }
            }

            public boolean needsObjectState() {
                return true;
            }

            public boolean needsBinaryState() {
                return hasBinaryState;
            }

            public AggregateState createState() {
                return new AggregateState();
            }

        };
    }