in hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/aggregators/MinMaxStringFieldAggregatorFactory.java [66:204]
public IFieldAggregateDescriptor createAggregator(IHyracksTaskContext ctx, RecordDescriptor inRecordDescriptor,
RecordDescriptor outRecordDescriptor) throws HyracksDataException {
return new IFieldAggregateDescriptor() {
UTF8StringSerializerDeserializer utf8SerializerDeserializer = new UTF8StringSerializerDeserializer();
@Override
public void reset() {
}
@Override
public void outputPartialResult(DataOutput fieldOutput, byte[] data, int offset, AggregateState state)
throws HyracksDataException {
try {
if (hasBinaryState) {
int stateIdx = IntegerPointable.getInteger(data, offset);
Object[] storedState = (Object[]) state.state;
fieldOutput.writeUTF((String) storedState[stateIdx]);
} else {
fieldOutput.writeUTF((String) state.state);
}
} catch (IOException e) {
throw new HyracksDataException(
"I/O exception when writing a string to the output writer in MinMaxStringAggregatorFactory.");
}
}
@Override
public void outputFinalResult(DataOutput fieldOutput, byte[] data, int offset, AggregateState state)
throws HyracksDataException {
try {
if (hasBinaryState) {
int stateIdx = IntegerPointable.getInteger(data, offset);
Object[] storedState = (Object[]) state.state;
fieldOutput.writeUTF((String) storedState[stateIdx]);
} else {
fieldOutput.writeUTF((String) state.state);
}
} catch (IOException e) {
throw new HyracksDataException(
"I/O exception when writing a string to the output writer in MinMaxStringAggregatorFactory.");
}
}
@Override
public void init(IFrameTupleAccessor accessor, int tIndex, DataOutput fieldOutput, AggregateState state)
throws HyracksDataException {
int tupleOffset = accessor.getTupleStartOffset(tIndex);
int fieldStart = accessor.getFieldStartOffset(tIndex, aggField);
int fieldLength = accessor.getFieldLength(tIndex, aggField);
String strField = utf8SerializerDeserializer.deserialize(new DataInputStream(
new ByteArrayInputStream(accessor.getBuffer().array(), tupleOffset
+ accessor.getFieldSlotsLength() + fieldStart, fieldLength)));
if (hasBinaryState) {
// Object-binary-state
Object[] storedState;
if (state.state == null) {
storedState = new Object[8];
storedState[0] = new Integer(0);
state.state = storedState;
} else {
storedState = (Object[]) state.state;
}
int stateCount = (Integer) (storedState[0]);
if (stateCount + 1 >= storedState.length) {
storedState = Arrays.copyOf(storedState, storedState.length * 2);
state.state = storedState;
}
stateCount++;
storedState[0] = stateCount;
storedState[stateCount] = strField;
try {
fieldOutput.writeInt(stateCount);
} catch (IOException e) {
throw new HyracksDataException(e.fillInStackTrace());
}
} else {
// Only object-state
state.state = strField;
}
}
@Override
public void close() {
// TODO Auto-generated method stub
}
@Override
public void aggregate(IFrameTupleAccessor accessor, int tIndex, byte[] data, int offset,
AggregateState state) throws HyracksDataException {
int tupleOffset = accessor.getTupleStartOffset(tIndex);
int fieldStart = accessor.getFieldStartOffset(tIndex, aggField);
int fieldLength = accessor.getFieldLength(tIndex, aggField);
String strField = utf8SerializerDeserializer.deserialize(new DataInputStream(
new ByteArrayInputStream(accessor.getBuffer().array(), tupleOffset
+ accessor.getFieldSlotsLength() + fieldStart, fieldLength)));
if (hasBinaryState) {
int stateIdx = IntegerPointable.getInteger(data, offset);
Object[] storedState = (Object[]) state.state;
if (isMax) {
if (strField.length() > ((String) (storedState[stateIdx])).length()) {
storedState[stateIdx] = strField;
}
} else {
if (strField.length() < ((String) (storedState[stateIdx])).length()) {
storedState[stateIdx] = strField;
}
}
} else {
if (isMax) {
if (strField.length() > ((String) (state.state)).length()) {
state.state = strField;
}
} else {
if (strField.length() < ((String) (state.state)).length()) {
state.state = strField;
}
}
}
}
public boolean needsObjectState() {
return true;
}
public boolean needsBinaryState() {
return hasBinaryState;
}
public AggregateState createState() {
return new AggregateState();
}
};
}