in hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/aggregators/AvgFieldGroupAggregatorFactory.java [61:174]
public IFieldAggregateDescriptor createAggregator(IHyracksTaskContext ctx, RecordDescriptor inRecordDescriptor,
RecordDescriptor outRecordDescriptor) throws HyracksDataException {
return new IFieldAggregateDescriptor() {
@Override
public void reset() {
}
@Override
public void outputPartialResult(DataOutput fieldOutput, byte[] data, int offset, AggregateState state)
throws HyracksDataException {
int sum, count;
if (!useObjectState) {
sum = IntegerPointable.getInteger(data, offset);
count = IntegerPointable.getInteger(data, offset + 4);
} else {
Integer[] fields = (Integer[]) state.state;
sum = fields[0];
count = fields[1];
}
try {
fieldOutput.writeInt(sum);
fieldOutput.writeInt(count);
} catch (IOException e) {
throw new HyracksDataException("I/O exception when writing aggregation to the output buffer.");
}
}
@Override
public void outputFinalResult(DataOutput fieldOutput, byte[] data, int offset, AggregateState state)
throws HyracksDataException {
int sum, count;
if (!useObjectState) {
sum = IntegerPointable.getInteger(data, offset);
count = IntegerPointable.getInteger(data, offset + 4);
} else {
Integer[] fields = (Integer[]) state.state;
sum = fields[0];
count = fields[1];
}
try {
fieldOutput.writeFloat((float) sum / count);
} catch (IOException e) {
throw new HyracksDataException("I/O exception when writing aggregation to the output buffer.");
}
}
@Override
public void init(IFrameTupleAccessor accessor, int tIndex, DataOutput fieldOutput, AggregateState state)
throws HyracksDataException {
int sum = 0;
int count = 0;
int tupleOffset = accessor.getTupleStartOffset(tIndex);
int fieldStart = accessor.getFieldStartOffset(tIndex, aggField);
sum += IntegerPointable.getInteger(accessor.getBuffer().array(), tupleOffset + accessor.getFieldSlotsLength() + fieldStart);
count += 1;
if (!useObjectState) {
try {
fieldOutput.writeInt(sum);
fieldOutput.writeInt(count);
} catch (IOException e) {
throw new HyracksDataException("I/O exception when initializing the aggregator.");
}
} else {
state.state = new Integer[] { sum, count };
}
}
@Override
public void close() {
// TODO Auto-generated method stub
}
@Override
public void aggregate(IFrameTupleAccessor accessor, int tIndex, byte[] data, int offset,
AggregateState state) throws HyracksDataException {
int sum = 0, count = 0;
int tupleOffset = accessor.getTupleStartOffset(tIndex);
int fieldStart = accessor.getFieldStartOffset(tIndex, aggField);
sum += IntegerPointable.getInteger(accessor.getBuffer().array(), tupleOffset + accessor.getFieldSlotsLength() + fieldStart);
count += 1;
if (!useObjectState) {
ByteBuffer buf = ByteBuffer.wrap(data);
sum += buf.getInt(offset);
count += buf.getInt(offset + 4);
buf.putInt(offset, sum);
buf.putInt(offset + 4, count);
} else {
Integer[] fields = (Integer[]) state.state;
sum += fields[0];
count += fields[1];
state.state = new Integer[] { sum, count };
}
}
@Override
public boolean needsObjectState() {
return useObjectState;
}
@Override
public boolean needsBinaryState() {
return !useObjectState;
}
@Override
public AggregateState createState() {
return new AggregateState(new Integer[] { 0, 0 });
}
};
}