in hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/HashSpillableTableFactory.java [60:239]
public ISpillableTable buildSpillableTable(final IHyracksTaskContext ctx, int suggestTableSize, long dataBytesSize,
final int[] keyFields, final IBinaryComparator[] comparators,
final INormalizedKeyComputer firstKeyNormalizerFactory, IAggregatorDescriptorFactory aggregateFactory,
RecordDescriptor inRecordDescriptor, RecordDescriptor outRecordDescriptor, final int framesLimit,
final int seed) throws HyracksDataException {
if (framesLimit < 2) {
throw new HyracksDataException("The frame limit is too small to partition the data");
}
final int tableSize = suggestTableSize;
final int[] intermediateResultKeys = new int[keyFields.length];
for (int i = 0; i < keyFields.length; i++) {
intermediateResultKeys[i] = i;
}
final FrameTuplePairComparator ftpcInputCompareToAggregate = new FrameTuplePairComparator(keyFields,
intermediateResultKeys, comparators);
final ITuplePartitionComputer tpc = new FieldHashPartitionComputerFamily(keyFields, hashFunctionFamilies)
.createPartitioner(seed);
final IAggregatorDescriptor aggregator = aggregateFactory.createAggregator(ctx, inRecordDescriptor,
outRecordDescriptor, keyFields, intermediateResultKeys, null);
final AggregateState aggregateState = aggregator.createAggregateStates();
final ArrayTupleBuilder stateTupleBuilder = new ArrayTupleBuilder(outRecordDescriptor.getFields().length);
//TODO(jf) research on the optimized partition size
final int numPartitions = getNumOfPartitions((int) (dataBytesSize / ctx.getInitialFrameSize()),
framesLimit - 1);
final int entriesPerPartition = (int) Math.ceil(1.0 * tableSize / numPartitions);
if (LOGGER.isLoggable(Level.FINE)) {
LOGGER.fine("create hashtable, table size:" + tableSize + " file size:" + dataBytesSize + " partitions:"
+ numPartitions);
}
final ArrayTupleBuilder outputTupleBuilder = new ArrayTupleBuilder(outRecordDescriptor.getFields().length);
final ISerializableTable hashTableForTuplePointer = new SerializableHashTable(tableSize, ctx);
return new ISpillableTable() {
private final TuplePointer pointer = new TuplePointer();
private final BitSet spilledSet = new BitSet(numPartitions);
final IPartitionedTupleBufferManager bufferManager = new VPartitionTupleBufferManager(ctx,
PreferToSpillFullyOccupiedFramePolicy.createAtMostOneFrameForSpilledPartitionConstrain(spilledSet),
numPartitions, framesLimit * ctx.getInitialFrameSize());
final ITuplePointerAccessor bufferAccessor = bufferManager.getTupleAccessor(outRecordDescriptor);
private final PreferToSpillFullyOccupiedFramePolicy spillPolicy = new PreferToSpillFullyOccupiedFramePolicy(
bufferManager, spilledSet, ctx.getInitialFrameSize());
private final FrameTupleAppender outputAppender = new FrameTupleAppender(new VSizeFrame(ctx));
@Override
public void close() throws HyracksDataException {
hashTableForTuplePointer.close();
aggregator.close();
}
@Override
public void clear(int partition) throws HyracksDataException {
for (int p = getFirstEntryInHashTable(partition); p < getLastEntryInHashTable(partition); p++) {
hashTableForTuplePointer.delete(p);
}
bufferManager.clearPartition(partition);
}
private int getPartition(int entryInHashTable) {
return entryInHashTable / entriesPerPartition;
}
private int getFirstEntryInHashTable(int partition) {
return partition * entriesPerPartition;
}
private int getLastEntryInHashTable(int partition) {
return Math.min(tableSize, (partition + 1) * entriesPerPartition);
}
@Override
public boolean insert(IFrameTupleAccessor accessor, int tIndex) throws HyracksDataException {
int entryInHashTable = tpc.partition(accessor, tIndex, tableSize);
for (int i = 0; i < hashTableForTuplePointer.getTupleCount(entryInHashTable); i++) {
hashTableForTuplePointer.getTuplePointer(entryInHashTable, i, pointer);
bufferAccessor.reset(pointer);
int c = ftpcInputCompareToAggregate.compare(accessor, tIndex, bufferAccessor);
if (c == 0) {
aggregateExistingTuple(accessor, tIndex, bufferAccessor, pointer.tupleIndex);
return true;
}
}
return insertNewAggregateEntry(entryInHashTable, accessor, tIndex);
}
private boolean insertNewAggregateEntry(int entryInHashTable, IFrameTupleAccessor accessor, int tIndex)
throws HyracksDataException {
initStateTupleBuilder(accessor, tIndex);
int pid = getPartition(entryInHashTable);
if (!bufferManager.insertTuple(pid, stateTupleBuilder.getByteArray(),
stateTupleBuilder.getFieldEndOffsets(), 0, stateTupleBuilder.getSize(), pointer)) {
return false;
}
hashTableForTuplePointer.insert(entryInHashTable, pointer);
return true;
}
private void initStateTupleBuilder(IFrameTupleAccessor accessor, int tIndex) throws HyracksDataException {
stateTupleBuilder.reset();
for (int k = 0; k < keyFields.length; k++) {
stateTupleBuilder.addField(accessor, tIndex, keyFields[k]);
}
aggregator.init(stateTupleBuilder, accessor, tIndex, aggregateState);
}
private void aggregateExistingTuple(IFrameTupleAccessor accessor, int tIndex,
ITuplePointerAccessor bufferAccessor, int tupleIndex) throws HyracksDataException {
aggregator.aggregate(accessor, tIndex, bufferAccessor, tupleIndex, aggregateState);
}
@Override
public int flushFrames(int partition, IFrameWriter writer, AggregateType type) throws HyracksDataException {
int count = 0;
for (int hashEntryPid = getFirstEntryInHashTable(partition); hashEntryPid < getLastEntryInHashTable(
partition); hashEntryPid++) {
count += hashTableForTuplePointer.getTupleCount(hashEntryPid);
for (int tid = 0; tid < hashTableForTuplePointer.getTupleCount(hashEntryPid); tid++) {
hashTableForTuplePointer.getTuplePointer(hashEntryPid, tid, pointer);
bufferAccessor.reset(pointer);
outputTupleBuilder.reset();
for (int k = 0; k < intermediateResultKeys.length; k++) {
outputTupleBuilder.addField(bufferAccessor.getBuffer().array(),
bufferAccessor.getAbsFieldStartOffset(intermediateResultKeys[k]),
bufferAccessor.getFieldLength(intermediateResultKeys[k]));
}
boolean hasOutput = false;
switch (type) {
case PARTIAL:
hasOutput = aggregator.outputPartialResult(outputTupleBuilder, bufferAccessor,
pointer.tupleIndex, aggregateState);
break;
case FINAL:
hasOutput = aggregator.outputFinalResult(outputTupleBuilder, bufferAccessor,
pointer.tupleIndex, aggregateState);
break;
}
if (hasOutput && !outputAppender.appendSkipEmptyField(outputTupleBuilder.getFieldEndOffsets(),
outputTupleBuilder.getByteArray(), 0, outputTupleBuilder.getSize())) {
outputAppender.write(writer, true);
if (!outputAppender.appendSkipEmptyField(outputTupleBuilder.getFieldEndOffsets(),
outputTupleBuilder.getByteArray(), 0, outputTupleBuilder.getSize())) {
throw new HyracksDataException("The output item is too large to be fit into a frame.");
}
}
}
}
outputAppender.write(writer, true);
spilledSet.set(partition);
return count;
}
@Override
public int getNumPartitions() {
return bufferManager.getNumPartitions();
}
@Override
public int findVictimPartition(IFrameTupleAccessor accessor, int tIndex) throws HyracksDataException {
int entryInHashTable = tpc.partition(accessor, tIndex, tableSize);
int partition = getPartition(entryInHashTable);
return spillPolicy.selectVictimPartition(partition);
}
};
}