in algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggreg/SerializableAggregatorDescriptorFactory.java [45:166]
public IAggregatorDescriptor createAggregator(IHyracksTaskContext ctx, RecordDescriptor inRecordDescriptor,
RecordDescriptor outRecordDescriptor, int[] keyFields, final int[] keyFieldsInPartialResults)
throws HyracksDataException {
final int[] keys = keyFields;
/**
* one IAggregatorDescriptor instance per Gby operator
*/
return new IAggregatorDescriptor() {
private FrameTupleReference ftr = new FrameTupleReference();
private ISerializedAggregateEvaluator[] aggs = new ISerializedAggregateEvaluator[aggFactories.length];
private int offsetFieldIndex = keys.length;
private int stateFieldLength[] = new int[aggFactories.length];
@Override
public AggregateState createAggregateStates() {
return new AggregateState();
}
@Override
public void init(ArrayTupleBuilder tb, IFrameTupleAccessor accessor, int tIndex, AggregateState state)
throws HyracksDataException {
DataOutput output = tb.getDataOutput();
ftr.reset(accessor, tIndex);
for (int i = 0; i < aggs.length; i++) {
try {
int begin = tb.getSize();
if (aggs[i] == null) {
aggs[i] = aggFactories[i].createAggregateEvaluator(ctx);
}
aggs[i].init(output);
tb.addFieldEndOffset();
stateFieldLength[i] = tb.getSize() - begin;
} catch (AlgebricksException e) {
throw new HyracksDataException(e);
}
}
// doing initial aggregate
ftr.reset(accessor, tIndex);
for (int i = 0; i < aggs.length; i++) {
try {
byte[] data = tb.getByteArray();
int prevFieldPos = i + keys.length - 1;
int start = prevFieldPos >= 0 ? tb.getFieldEndOffsets()[prevFieldPos] : 0;
aggs[i].step(ftr, data, start, stateFieldLength[i]);
} catch (AlgebricksException e) {
throw new HyracksDataException(e);
}
}
}
@Override
public void aggregate(IFrameTupleAccessor accessor, int tIndex, IFrameTupleAccessor stateAccessor,
int stateTupleIndex, AggregateState state) throws HyracksDataException {
ftr.reset(accessor, tIndex);
int stateTupleStart = stateAccessor.getTupleStartOffset(stateTupleIndex);
int fieldSlotLength = stateAccessor.getFieldSlotsLength();
for (int i = 0; i < aggs.length; i++) {
try {
byte[] data = stateAccessor.getBuffer().array();
int start = stateAccessor.getFieldStartOffset(stateTupleIndex, i + keys.length)
+ stateTupleStart + fieldSlotLength;
aggs[i].step(ftr, data, start, stateFieldLength[i]);
} catch (AlgebricksException e) {
throw new HyracksDataException(e);
}
}
}
@Override
public boolean outputPartialResult(ArrayTupleBuilder tb, IFrameTupleAccessor stateAccessor, int tIndex,
AggregateState state) throws HyracksDataException {
byte[] data = stateAccessor.getBuffer().array();
int startOffset = stateAccessor.getTupleStartOffset(tIndex);
int aggFieldOffset = stateAccessor.getFieldStartOffset(tIndex, offsetFieldIndex);
int refOffset = startOffset + stateAccessor.getFieldSlotsLength() + aggFieldOffset;
int start = refOffset;
for (int i = 0; i < aggs.length; i++) {
try {
aggs[i].finishPartial(data, start, stateFieldLength[i], tb.getDataOutput());
start += stateFieldLength[i];
tb.addFieldEndOffset();
} catch (AlgebricksException e) {
throw new HyracksDataException(e);
}
}
return true;
}
@Override
public boolean outputFinalResult(ArrayTupleBuilder tb, IFrameTupleAccessor stateAccessor, int tIndex,
AggregateState state) throws HyracksDataException {
byte[] data = stateAccessor.getBuffer().array();
int startOffset = stateAccessor.getTupleStartOffset(tIndex);
int aggFieldOffset = stateAccessor.getFieldStartOffset(tIndex, offsetFieldIndex);
int refOffset = startOffset + stateAccessor.getFieldSlotsLength() + aggFieldOffset;
int start = refOffset;
for (int i = 0; i < aggs.length; i++) {
try {
aggs[i].finish(data, start, stateFieldLength[i], tb.getDataOutput());
start += stateFieldLength[i];
tb.addFieldEndOffset();
} catch (AlgebricksException e) {
throw new HyracksDataException(e);
}
}
return true;
}
@Override
public void reset() {
}
@Override
public void close() {
reset();
}
};
}