public boolean compress()

in hyracks/hyracks-storage-am-btree/src/main/java/org/apache/hyracks/storage/am/btree/compressors/FieldPrefixCompressor.java [56:315]


    public boolean compress(ITreeIndexFrame indexFrame, MultiComparator cmp) throws Exception {
        BTreeFieldPrefixNSMLeafFrame frame = (BTreeFieldPrefixNSMLeafFrame) indexFrame;
        int tupleCount = frame.getTupleCount();
        if (tupleCount <= 0) {
            frame.setPrefixTupleCount(0);
            frame.setFreeSpaceOff(frame.getOrigFreeSpaceOff());
            frame.setTotalFreeSpace(frame.getOrigTotalFreeSpace());
            return false;
        }

        if (cmp.getKeyFieldCount() == 1) {
            return false;
        }

        int uncompressedTupleCount = frame.getUncompressedTupleCount();
        float ratio = (float) uncompressedTupleCount / (float) tupleCount;
        if (ratio < ratioThreshold)
            return false;

        IBinaryComparator[] cmps = cmp.getComparators();
        int fieldCount = typeTraits.length;

        ByteBuffer buf = frame.getBuffer();
        byte[] pageArray = buf.array();
        IPrefixSlotManager slotManager = (IPrefixSlotManager) frame.getSlotManager();

        // perform analysis pass
        ArrayList<KeyPartition> keyPartitions = getKeyPartitions(frame, cmp, occurrenceThreshold);
        if (keyPartitions.size() == 0)
            return false;

        // for each keyPartition, determine the best prefix length for
        // compression, and count how many prefix tuple we would need in total
        int totalSlotsNeeded = 0;
        int totalPrefixBytes = 0;
        for (KeyPartition kp : keyPartitions) {

            for (int j = 0; j < kp.pmi.length; j++) {
                int benefitMinusCost = kp.pmi[j].spaceBenefit - kp.pmi[j].spaceCost;
                if (benefitMinusCost > kp.maxBenefitMinusCost) {
                    kp.maxBenefitMinusCost = benefitMinusCost;
                    kp.maxPmiIndex = j;
                }
            }

            // ignore keyPartitions with no benefit and don't count bytes and slots needed
            if (kp.maxBenefitMinusCost <= 0)
                continue;

            totalPrefixBytes += kp.pmi[kp.maxPmiIndex].prefixBytes;
            totalSlotsNeeded += kp.pmi[kp.maxPmiIndex].prefixSlotsNeeded;
        }

        // we use a greedy heuristic to solve this "knapsack"-like problem
        // (every keyPartition has a space savings and a number of slots
        // required, but the number of slots are constrained by MAX_PREFIX_SLOTS)
        // we sort the keyPartitions by maxBenefitMinusCost / prefixSlotsNeeded
        // and later choose the top MAX_PREFIX_SLOTS
        int[] newPrefixSlots;
        if (totalSlotsNeeded > FieldPrefixSlotManager.MAX_PREFIX_SLOTS) {
            // order keyPartitions by the heuristic function
            SortByHeuristic heuristicComparator = new SortByHeuristic();
            Collections.sort(keyPartitions, heuristicComparator);
            int slotsUsed = 0;
            int numberKeyPartitions = -1;
            for (int i = 0; i < keyPartitions.size(); i++) {
                KeyPartition kp = keyPartitions.get(i);
                slotsUsed += kp.pmi[kp.maxPmiIndex].prefixSlotsNeeded;
                if (slotsUsed > FieldPrefixSlotManager.MAX_PREFIX_SLOTS) {
                    numberKeyPartitions = i + 1;
                    slotsUsed -= kp.pmi[kp.maxPmiIndex].prefixSlotsNeeded;
                    break;
                }
            }
            newPrefixSlots = new int[slotsUsed];

            // remove irrelevant keyPartitions and adjust total prefix bytes
            while (keyPartitions.size() >= numberKeyPartitions) {
                int lastIndex = keyPartitions.size() - 1;
                KeyPartition kp = keyPartitions.get(lastIndex);
                if (kp.maxBenefitMinusCost > 0)
                    totalPrefixBytes -= kp.pmi[kp.maxPmiIndex].prefixBytes;
                keyPartitions.remove(lastIndex);
            }

            // re-order keyPartitions by prefix (corresponding to original order)
            SortByOriginalRank originalRankComparator = new SortByOriginalRank();
            Collections.sort(keyPartitions, originalRankComparator);
        } else {
            newPrefixSlots = new int[totalSlotsNeeded];
        }

        int[] newTupleSlots = new int[tupleCount];

        // WARNING: our hope is that compression is infrequent
        // here we allocate a big chunk of memory to temporary hold the new, re-compressed tuple
        // in general it is very hard to avoid this step
        int prefixFreeSpace = frame.getOrigFreeSpaceOff();
        int tupleFreeSpace = prefixFreeSpace + totalPrefixBytes;
        byte[] buffer = new byte[buf.capacity()];
        ByteBuffer byteBuffer = ByteBuffer.wrap(buffer);

        // perform compression, and reorg
        // we assume that the keyPartitions are sorted by the prefixes
        // (i.e., in the logical target order)
        int kpIndex = 0;
        int tupleIndex = 0;
        int prefixTupleIndex = 0;
        uncompressedTupleCount = 0;

        TypeAwareTupleWriter tupleWriter = new TypeAwareTupleWriter(typeTraits);
        FieldPrefixTupleReference tupleToWrite = new FieldPrefixTupleReference(tupleWriter.createTupleReference());
        tupleToWrite.setFieldCount(fieldCount);

        while (tupleIndex < tupleCount) {
            if (kpIndex < keyPartitions.size()) {

                // beginning of keyPartition found, compress entire keyPartition
                if (tupleIndex == keyPartitions.get(kpIndex).firstTupleIndex) {

                    // number of fields we decided to use for compression of this keyPartition
                    int fieldCountToCompress = keyPartitions.get(kpIndex).maxPmiIndex + 1;
                    int segmentStart = keyPartitions.get(kpIndex).firstTupleIndex;
                    int tuplesInSegment = 1;

                    FieldPrefixTupleReference prevTuple = new FieldPrefixTupleReference(
                            tupleWriter.createTupleReference());
                    prevTuple.setFieldCount(fieldCount);

                    FieldPrefixTupleReference tuple = new FieldPrefixTupleReference(tupleWriter.createTupleReference());
                    tuple.setFieldCount(fieldCount);

                    for (int i = tupleIndex + 1; i <= keyPartitions.get(kpIndex).lastTupleIndex; i++) {
                        prevTuple.resetByTupleIndex(frame, i - 1);
                        tuple.resetByTupleIndex(frame, i);

                        // check if tuples match in fieldCountToCompress of their first fields
                        int prefixFieldsMatch = 0;
                        for (int j = 0; j < fieldCountToCompress; j++) {
                            if (cmps[j].compare(pageArray, prevTuple.getFieldStart(j), prevTuple.getFieldLength(j),
                                    pageArray, tuple.getFieldStart(j), tuple.getFieldLength(j)) == 0)
                                prefixFieldsMatch++;
                            else
                                break;
                        }

                        // the two tuples must match in exactly the number of fields we decided
                        // to compress for this keyPartition
                        int processSegments = 0;
                        if (prefixFieldsMatch == fieldCountToCompress)
                            tuplesInSegment++;
                        else
                            processSegments++;

                        if (i == keyPartitions.get(kpIndex).lastTupleIndex)
                            processSegments++;

                        for (int r = 0; r < processSegments; r++) {
                            // compress current segment and then start new segment
                            if (tuplesInSegment < occurrenceThreshold || fieldCountToCompress <= 0) {
                                // segment does not have at least occurrenceThreshold tuples, so
                                // write tuples uncompressed
                                for (int j = 0; j < tuplesInSegment; j++) {
                                    int slotNum = segmentStart + j;
                                    tupleToWrite.resetByTupleIndex(frame, slotNum);
                                    newTupleSlots[tupleCount - 1 - slotNum] = slotManager.encodeSlotFields(
                                            FieldPrefixSlotManager.TUPLE_UNCOMPRESSED, tupleFreeSpace);
                                    tupleFreeSpace += tupleWriter.writeTuple(tupleToWrite, byteBuffer, tupleFreeSpace);
                                }
                                uncompressedTupleCount += tuplesInSegment;
                            } else {
                                // segment has enough tuples: compress segment, extract prefix,
                                // write prefix tuple to buffer, and set prefix slot
                                newPrefixSlots[newPrefixSlots.length - 1 - prefixTupleIndex] = slotManager
                                        .encodeSlotFields(fieldCountToCompress, prefixFreeSpace);
                                prefixFreeSpace += tupleWriter.writeTupleFields(prevTuple, 0, fieldCountToCompress,
                                        byteBuffer.array(), prefixFreeSpace);

                                // truncate tuples, write them to buffer, and set tuple slots
                                for (int j = 0; j < tuplesInSegment; j++) {
                                    int currTupleIndex = segmentStart + j;
                                    tupleToWrite.resetByTupleIndex(frame, currTupleIndex);
                                    newTupleSlots[tupleCount - 1 - currTupleIndex] = slotManager.encodeSlotFields(
                                            prefixTupleIndex, tupleFreeSpace);
                                    tupleFreeSpace += tupleWriter.writeTupleFields(tupleToWrite, fieldCountToCompress,
                                            fieldCount - fieldCountToCompress, byteBuffer.array(), tupleFreeSpace);
                                }

                                prefixTupleIndex++;
                            }

                            // begin new segment
                            segmentStart = i;
                            tuplesInSegment = 1;
                        }
                    }

                    tupleIndex = keyPartitions.get(kpIndex).lastTupleIndex;
                    kpIndex++;
                } else {
                    // just write the tuple uncompressed
                    tupleToWrite.resetByTupleIndex(frame, tupleIndex);
                    newTupleSlots[tupleCount - 1 - tupleIndex] = slotManager.encodeSlotFields(
                            FieldPrefixSlotManager.TUPLE_UNCOMPRESSED, tupleFreeSpace);
                    tupleFreeSpace += tupleWriter.writeTuple(tupleToWrite, byteBuffer, tupleFreeSpace);
                    uncompressedTupleCount++;
                }
            } else {
                // just write the tuple uncompressed
                tupleToWrite.resetByTupleIndex(frame, tupleIndex);
                newTupleSlots[tupleCount - 1 - tupleIndex] = slotManager.encodeSlotFields(
                        FieldPrefixSlotManager.TUPLE_UNCOMPRESSED, tupleFreeSpace);
                tupleFreeSpace += tupleWriter.writeTuple(tupleToWrite, byteBuffer, tupleFreeSpace);
                uncompressedTupleCount++;
            }
            tupleIndex++;
        }

        // sanity check to see if we have written exactly as many prefix bytes as computed before
        if (prefixFreeSpace != frame.getOrigFreeSpaceOff() + totalPrefixBytes) {
            throw new Exception("ERROR: Number of prefix bytes written don't match computed number");
        }

        // in some rare instances our procedure could even increase the space requirement which is very dangerous
        // this can happen to to the greedy solution of the knapsack-like problem
        // therefore, we check if the new space exceeds the page size to avoid the only danger of
        // an increasing space
        int totalSpace = tupleFreeSpace + newTupleSlots.length * slotManager.getSlotSize() + newPrefixSlots.length
                * slotManager.getSlotSize();
        if (totalSpace > buf.capacity())
            // just leave the page as is
            return false;

        // copy new tuple and new slots into original page
        int freeSpaceAfterInit = frame.getOrigFreeSpaceOff();
        System.arraycopy(buffer, freeSpaceAfterInit, pageArray, freeSpaceAfterInit, tupleFreeSpace - freeSpaceAfterInit);

        // copy prefix slots
        int slotOffRunner = buf.capacity() - slotManager.getSlotSize();
        for (int i = 0; i < newPrefixSlots.length; i++) {
            buf.putInt(slotOffRunner, newPrefixSlots[newPrefixSlots.length - 1 - i]);
            slotOffRunner -= slotManager.getSlotSize();
        }

        // copy tuple slots
        for (int i = 0; i < newTupleSlots.length; i++) {
            buf.putInt(slotOffRunner, newTupleSlots[newTupleSlots.length - 1 - i]);
            slotOffRunner -= slotManager.getSlotSize();
        }

        // update space fields, TODO: we need to update more fields
        frame.setFreeSpaceOff(tupleFreeSpace);
        frame.setPrefixTupleCount(newPrefixSlots.length);
        frame.setUncompressedTupleCount(uncompressedTupleCount);
        int totalFreeSpace = buf.capacity() - tupleFreeSpace
                - ((newTupleSlots.length + newPrefixSlots.length) * slotManager.getSlotSize());
        frame.setTotalFreeSpace(totalFreeSpace);

        return true;
    }