public ISpillableTable buildSpillableTable()

in hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/HashSpillableTableFactory.java [68:327]


    public ISpillableTable buildSpillableTable(final IHyracksTaskContext ctx, int suggestTableSize,
            long inputDataBytesSize, final int[] gbyFields, final int[] fdFields, final IBinaryComparator[] comparators,
            final INormalizedKeyComputer firstKeyNormalizerFactory, IAggregatorDescriptorFactory aggregateFactory,
            RecordDescriptor inRecordDescriptor, RecordDescriptor outRecordDescriptor, final int framesLimit,
            final int seed) throws HyracksDataException {
        final int tableSize = suggestTableSize;

        // For HashTable, we need to have at least two frames (one for header and one for content).
        // For DataTable, we need to have at least one frame.
        // For the output, we need to have at least one frame.
        if (framesLimit < MIN_FRAME_LIMT) {
            throw new HyracksDataException("The given frame limit is too small to partition the data.");
        }

        final int[] intermediateResultGbyFields = new int[gbyFields.length];
        for (int i = 0; i < gbyFields.length; i++) {
            intermediateResultGbyFields[i] = i;
        }

        final int[] allFields;
        final int[] intermediateResultAllFields;
        if (fdFields == null) {
            // no need to combine gby and fd
            allFields = gbyFields;
            intermediateResultAllFields = intermediateResultGbyFields;
        } else {
            allFields = new int[gbyFields.length + fdFields.length];
            intermediateResultAllFields = new int[gbyFields.length + fdFields.length];
            int k = 0;
            int position = 0;
            for (int i = 0; i < gbyFields.length; i++, position++, k++) {
                allFields[k] = gbyFields[i];
                intermediateResultAllFields[k] = position;
            }
            for (int i = 0; i < fdFields.length; i++, position++, k++) {
                allFields[k] = fdFields[i];
                intermediateResultAllFields[k] = position;
            }
        }

        final FrameTuplePairComparator ftpcInputCompareToAggregate =
                new FrameTuplePairComparator(gbyFields, intermediateResultGbyFields, comparators);

        final ITuplePartitionComputer tpc =
                new FieldHashPartitionComputerFamily(gbyFields, hashFunctionFamilies).createPartitioner(seed);

        // For calculating hash value for the already aggregated tuples (not incoming tuples)
        // This computer is required to calculate the hash value of a aggregated tuple
        // while doing the garbage collection work on Hash Table.
        final ITuplePartitionComputer tpcIntermediate =
                new FieldHashPartitionComputerFamily(intermediateResultGbyFields, hashFunctionFamilies)
                        .createPartitioner(seed);

        final IAggregatorDescriptor aggregator = aggregateFactory.createAggregator(ctx, inRecordDescriptor,
                outRecordDescriptor, allFields, intermediateResultAllFields, null, -1);

        final AggregateState aggregateState = aggregator.createAggregateStates();

        final ArrayTupleBuilder stateTupleBuilder = new ArrayTupleBuilder(outRecordDescriptor.getFields().length);

        //TODO(jf) research on the optimized partition size
        long memoryBudget = Math.max(MIN_DATA_TABLE_FRAME_LIMT + MIN_HASH_TABLE_FRAME_LIMT,
                framesLimit - OUTPUT_FRAME_LIMT - MIN_HASH_TABLE_FRAME_LIMT);

        final int numPartitions = getNumOfPartitions(inputDataBytesSize / ctx.getInitialFrameSize(), memoryBudget);
        final int entriesPerPartition = (int) Math.ceil(1.0 * tableSize / numPartitions);
        if (LOGGER.isTraceEnabled()) {
            LOGGER.trace("created hashtable, table size:{} file size:{}  #partitions:{}", tableSize, inputDataBytesSize,
                    numPartitions);
        }

        final ArrayTupleBuilder outputTupleBuilder = new ArrayTupleBuilder(outRecordDescriptor.getFields().length);

        return new ISpillableTable() {

            private final TuplePointer pointer = new TuplePointer();
            private final BitSet spilledSet = new BitSet(numPartitions);
            // This frame pool will be shared by both data table and hash table.
            private final IDeallocatableFramePool framePool =
                    new DeallocatableFramePool(ctx, framesLimit * ctx.getInitialFrameSize());
            // buffer manager for hash table
            private final ISimpleFrameBufferManager bufferManagerForHashTable =
                    new FramePoolBackedFrameBufferManager(framePool);

            private final ISerializableTable hashTableForTuplePointer =
                    new SerializableHashTable(tableSize, ctx, bufferManagerForHashTable);

            // buffer manager for data table
            final IPartitionedTupleBufferManager bufferManager = new VPartitionTupleBufferManager(
                    PreferToSpillFullyOccupiedFramePolicy.createAtMostOneFrameForSpilledPartitionConstrain(spilledSet),
                    numPartitions, framePool);

            final ITuplePointerAccessor bufferAccessor = bufferManager.getTuplePointerAccessor(outRecordDescriptor);

            private final PreferToSpillFullyOccupiedFramePolicy spillPolicy =
                    new PreferToSpillFullyOccupiedFramePolicy(bufferManager, spilledSet);

            private final FrameTupleAppender outputAppender = new FrameTupleAppender(new VSizeFrame(ctx));

            @Override
            public void close() throws HyracksDataException {
                hashTableForTuplePointer.close();
                aggregator.close();
            }

            @Override
            public void clear(int partition) throws HyracksDataException {
                for (int p = getFirstEntryInHashTable(partition); p < getLastEntryInHashTable(partition); p++) {
                    hashTableForTuplePointer.delete(p);
                }

                // Checks whether the garbage collection is required and conducts a garbage collection if so.
                collectGarbageInHashTableForTuplePointer(false);
                bufferManager.clearPartition(partition);
            }

            private boolean collectGarbageInHashTableForTuplePointer(boolean force) throws HyracksDataException {
                if (force || hashTableForTuplePointer.isGarbageCollectionNeeded()) {
                    int numberOfFramesReclaimed =
                            hashTableForTuplePointer.collectGarbage(bufferAccessor, tpcIntermediate);
                    if (LOGGER.isTraceEnabled()) {
                        LOGGER.trace("Garbage Collection on Hash table is done. Deallocated frames:{}",
                                numberOfFramesReclaimed);
                    }
                    return numberOfFramesReclaimed != -1;
                }
                return false;
            }

            private int getPartition(int entryInHashTable) {
                return entryInHashTable / entriesPerPartition;
            }

            private int getFirstEntryInHashTable(int partition) {
                return partition * entriesPerPartition;
            }

            private int getLastEntryInHashTable(int partition) {
                return Math.min(tableSize, (partition + 1) * entriesPerPartition);
            }

            @Override
            public boolean insert(IFrameTupleAccessor accessor, int tIndex) throws HyracksDataException {
                int entryInHashTable = tpc.partition(accessor, tIndex, tableSize);
                for (int i = 0; i < hashTableForTuplePointer.getTupleCount(entryInHashTable); i++) {
                    hashTableForTuplePointer.getTuplePointer(entryInHashTable, i, pointer);
                    bufferAccessor.reset(pointer);
                    int c = ftpcInputCompareToAggregate.compare(accessor, tIndex, bufferAccessor);
                    if (c == 0) {
                        aggregateExistingTuple(accessor, tIndex, bufferAccessor, pointer.getTupleIndex());
                        return true;
                    }
                }
                return insertNewAggregateEntry(entryInHashTable, accessor, tIndex);
            }

            /**
             * Inserts a new aggregate entry into the data table and hash table.
             * This insertion must be an atomic operation. We cannot have a partial success or failure.
             * So, if an insertion succeeds on the data table and the same insertion on the hash table fails, then
             * we need to revert the effect of data table insertion.
             */
            private boolean insertNewAggregateEntry(int entryInHashTable, IFrameTupleAccessor accessor, int tIndex)
                    throws HyracksDataException {
                initStateTupleBuilder(accessor, tIndex);
                int pid = getPartition(entryInHashTable);

                // Insertion to the data table
                if (!bufferManager.insertTuple(pid, stateTupleBuilder.getByteArray(),
                        stateTupleBuilder.getFieldEndOffsets(), 0, stateTupleBuilder.getSize(), pointer)) {
                    return false;
                }

                // Insertion to the hash table
                boolean inserted = hashTableForTuplePointer.insert(entryInHashTable, pointer);
                if (!inserted) {
                    // Force garbage collection on the hash table and attempt to insert again
                    if (collectGarbageInHashTableForTuplePointer(true)) {
                        inserted = hashTableForTuplePointer.insert(entryInHashTable, pointer);
                    }
                    if (!inserted) {
                        // To preserve the atomicity of this method, we need to undo the effect
                        // of the above bufferManager.insertTuple() call since the given insertion has failed.
                        bufferManager.cancelInsertTuple(pid);
                        return false;
                    }
                }

                return true;
            }

            private void initStateTupleBuilder(IFrameTupleAccessor accessor, int tIndex) throws HyracksDataException {
                stateTupleBuilder.reset();
                for (int k = 0; k < allFields.length; k++) {
                    stateTupleBuilder.addField(accessor, tIndex, allFields[k]);
                }
                aggregator.init(stateTupleBuilder, accessor, tIndex, aggregateState);
            }

            private void aggregateExistingTuple(IFrameTupleAccessor accessor, int tIndex,
                    ITuplePointerAccessor bufferAccessor, int tupleIndex) throws HyracksDataException {
                aggregator.aggregate(accessor, tIndex, bufferAccessor, tupleIndex, aggregateState);
            }

            @Override
            public int flushFrames(int partition, IFrameWriter writer, AggregateType type) throws HyracksDataException {
                int count = 0;
                for (int hashEntryPid = getFirstEntryInHashTable(partition); hashEntryPid < getLastEntryInHashTable(
                        partition); hashEntryPid++) {
                    count += hashTableForTuplePointer.getTupleCount(hashEntryPid);
                    for (int tid = 0; tid < hashTableForTuplePointer.getTupleCount(hashEntryPid); tid++) {
                        hashTableForTuplePointer.getTuplePointer(hashEntryPid, tid, pointer);
                        bufferAccessor.reset(pointer);
                        outputTupleBuilder.reset();
                        for (int k = 0; k < intermediateResultAllFields.length; k++) {
                            outputTupleBuilder.addField(bufferAccessor.getBuffer().array(),
                                    bufferAccessor.getAbsFieldStartOffset(intermediateResultAllFields[k]),
                                    bufferAccessor.getFieldLength(intermediateResultAllFields[k]));
                        }

                        boolean hasOutput = false;
                        switch (type) {
                            case PARTIAL:
                                hasOutput = aggregator.outputPartialResult(outputTupleBuilder, bufferAccessor,
                                        pointer.getTupleIndex(), aggregateState);
                                break;
                            case FINAL:
                                hasOutput = aggregator.outputFinalResult(outputTupleBuilder, bufferAccessor,
                                        pointer.getTupleIndex(), aggregateState);
                                break;
                        }

                        if (hasOutput && !outputAppender.appendSkipEmptyField(outputTupleBuilder.getFieldEndOffsets(),
                                outputTupleBuilder.getByteArray(), 0, outputTupleBuilder.getSize())) {
                            outputAppender.write(writer, true);
                            if (!outputAppender.appendSkipEmptyField(outputTupleBuilder.getFieldEndOffsets(),
                                    outputTupleBuilder.getByteArray(), 0, outputTupleBuilder.getSize())) {
                                throw new HyracksDataException("The output item is too large to be fit into a frame.");
                            }
                        }
                    }
                }
                outputAppender.write(writer, true);
                spilledSet.set(partition);
                return count;
            }

            @Override
            public int getNumPartitions() {
                return bufferManager.getNumPartitions();
            }

            @Override
            public int findVictimPartition(IFrameTupleAccessor accessor, int tIndex) throws HyracksDataException {
                int entryInHashTable = tpc.partition(accessor, tIndex, tableSize);
                int partition = getPartition(entryInHashTable);
                return spillPolicy.selectVictimPartition(partition);
            }
        };
    }