public void split()

in hyracks/hyracks-storage-am-rtree/src/main/java/org/apache/hyracks/storage/am/rtree/frames/RStarTreePolicy.java [73:239]


    public void split(ITreeIndexFrame leftFrame, ByteBuffer buf, ITreeIndexFrame rightFrame, ISlotManager slotManager,
            ITreeIndexTupleReference frameTuple, ITupleReference tuple, ISplitKey splitKey) throws HyracksDataException {
        RTreeSplitKey rTreeSplitKey = ((RTreeSplitKey) splitKey);
        RTreeTypeAwareTupleWriter rTreeTupleWriterleftRTreeFrame = ((RTreeTypeAwareTupleWriter) tupleWriter);
        RTreeTypeAwareTupleWriter rTreeTupleWriterRightFrame = ((RTreeTypeAwareTupleWriter) rightFrame.getTupleWriter());

        RTreeNSMFrame leftRTreeFrame = ((RTreeNSMFrame) leftFrame);

        // calculations are based on the R*-tree paper
        int m = (int) Math.floor((leftRTreeFrame.getTupleCount() + 1) * splitFactor);
        int splitDistribution = leftRTreeFrame.getTupleCount() - (2 * m) + 2;

        // to calculate the minimum margin in order to pick the split axis
        double minMargin = Double.MAX_VALUE;
        int splitAxis = 0, sortOrder = 0;

        int maxFieldPos = keyValueProviders.length / 2;
        for (int i = 0; i < maxFieldPos; i++) {
            int j = maxFieldPos + i;
            for (int k = 0; k < leftRTreeFrame.getTupleCount(); ++k) {

                frameTuple.resetByTupleIndex(leftRTreeFrame, k);
                double LowerKey = keyValueProviders[i]
                        .getValue(frameTuple.getFieldData(i), frameTuple.getFieldStart(i));
                double UpperKey = keyValueProviders[j]
                        .getValue(frameTuple.getFieldData(j), frameTuple.getFieldStart(j));

                tupleEntries1.add(k, LowerKey);
                tupleEntries2.add(k, UpperKey);
            }
            double LowerKey = keyValueProviders[i].getValue(tuple.getFieldData(i), tuple.getFieldStart(i));
            double UpperKey = keyValueProviders[j].getValue(tuple.getFieldData(j), tuple.getFieldStart(j));

            tupleEntries1.add(-1, LowerKey);
            tupleEntries2.add(-1, UpperKey);

            tupleEntries1.sort(EntriesOrder.ASCENDING, leftRTreeFrame.getTupleCount() + 1);
            tupleEntries2.sort(EntriesOrder.ASCENDING, leftRTreeFrame.getTupleCount() + 1);

            double lowerMargin = 0.0, upperMargin = 0.0;
            // generate distribution
            for (int k = 1; k <= splitDistribution; ++k) {
                int d = m - 1 + k;

                generateDist(leftRTreeFrame, frameTuple, tuple, tupleEntries1, rec[0], 0, d);
                generateDist(leftRTreeFrame, frameTuple, tuple, tupleEntries2, rec[1], 0, d);
                generateDist(leftRTreeFrame, frameTuple, tuple, tupleEntries1, rec[2], d,
                        leftRTreeFrame.getTupleCount() + 1);
                generateDist(leftRTreeFrame, frameTuple, tuple, tupleEntries2, rec[3], d,
                        leftRTreeFrame.getTupleCount() + 1);

                // calculate the margin of the distributions
                lowerMargin += rec[0].margin() + rec[2].margin();
                upperMargin += rec[1].margin() + rec[3].margin();
            }
            double margin = Math.min(lowerMargin, upperMargin);

            // store minimum margin as split axis
            if (margin < minMargin) {
                minMargin = margin;
                splitAxis = i;
                sortOrder = (lowerMargin < upperMargin) ? 0 : 2;
            }

            tupleEntries1.clear();
            tupleEntries2.clear();
        }

        for (int i = 0; i < leftRTreeFrame.getTupleCount(); ++i) {
            frameTuple.resetByTupleIndex(leftRTreeFrame, i);
            double key = keyValueProviders[splitAxis + sortOrder].getValue(
                    frameTuple.getFieldData(splitAxis + sortOrder), frameTuple.getFieldStart(splitAxis + sortOrder));
            tupleEntries1.add(i, key);
        }
        double key = keyValueProviders[splitAxis + sortOrder].getValue(tuple.getFieldData(splitAxis + sortOrder),
                tuple.getFieldStart(splitAxis + sortOrder));
        tupleEntries1.add(-1, key);
        tupleEntries1.sort(EntriesOrder.ASCENDING, leftRTreeFrame.getTupleCount() + 1);

        double minArea = Double.MAX_VALUE;
        double minOverlap = Double.MAX_VALUE;
        int splitPoint = 0;
        for (int i = 1; i <= splitDistribution; ++i) {
            int d = m - 1 + i;

            generateDist(leftRTreeFrame, frameTuple, tuple, tupleEntries1, rec[0], 0, d);
            generateDist(leftRTreeFrame, frameTuple, tuple, tupleEntries1, rec[2], d,
                    leftRTreeFrame.getTupleCount() + 1);

            double overlap = rec[0].overlappedArea(rec[2]);
            if (overlap < minOverlap) {
                splitPoint = d;
                minOverlap = overlap;
                minArea = rec[0].area() + rec[2].area();
            } else if (overlap == minOverlap) {
                double area = rec[0].area() + rec[2].area();
                if (area < minArea) {
                    splitPoint = d;
                    minArea = area;
                }
            }
        }
        int startIndex, endIndex;
        if (splitPoint < (leftRTreeFrame.getTupleCount() + 1) / 2) {
            startIndex = 0;
            endIndex = splitPoint;
        } else {
            startIndex = splitPoint;
            endIndex = (leftRTreeFrame.getTupleCount() + 1);
        }
        boolean insertedNewTupleInRightFrame = false;
        int totalBytes = 0, numOfDeletedTuples = 0;
        for (int i = startIndex; i < endIndex; i++) {
            if (tupleEntries1.get(i).getTupleIndex() != -1) {
                frameTuple.resetByTupleIndex(leftRTreeFrame, tupleEntries1.get(i).getTupleIndex());
                rightFrame.insert(frameTuple, -1);
                ((UnorderedSlotManager) slotManager).modifySlot(
                        slotManager.getSlotOff(tupleEntries1.get(i).getTupleIndex()), -1);
                totalBytes += leftRTreeFrame.getTupleSize(frameTuple);
                numOfDeletedTuples++;
            } else {
                insertedNewTupleInRightFrame = true;
            }
        }

        ((UnorderedSlotManager) slotManager).deleteEmptySlots();

        // maintain space information
        buf.putInt(totalFreeSpaceOff, buf.getInt(totalFreeSpaceOff) + totalBytes
                + (slotManager.getSlotSize() * numOfDeletedTuples));

        // compact both pages
        rightFrame.compact();
        leftRTreeFrame.compact();

        // The assumption here is that the new tuple cannot be larger than page
        // size, thus it must fit in either pages.
        if (insertedNewTupleInRightFrame) {
            if (rightFrame.hasSpaceInsert(tuple) == FrameOpSpaceStatus.SUFFICIENT_CONTIGUOUS_SPACE) {
                rightFrame.insert(tuple, -1);
            } else {
                leftRTreeFrame.insert(tuple, -1);
            }
        } else if (leftRTreeFrame.hasSpaceInsert(tuple) == FrameOpSpaceStatus.SUFFICIENT_CONTIGUOUS_SPACE) {
            leftRTreeFrame.insert(tuple, -1);
        } else {
            rightFrame.insert(tuple, -1);
        }

        int tupleOff = slotManager.getTupleOff(slotManager.getSlotEndOff());
        frameTuple.resetByTupleOffset(buf, tupleOff);
        int splitKeySize = tupleWriter.bytesRequired(frameTuple, 0, keyValueProviders.length);

        splitKey.initData(splitKeySize);
        leftRTreeFrame.adjustMBR();
        rTreeTupleWriterleftRTreeFrame.writeTupleFields(leftRTreeFrame.getMBRTuples(), 0,
                rTreeSplitKey.getLeftPageBuffer(), 0);
        rTreeSplitKey.getLeftTuple().resetByTupleOffset(rTreeSplitKey.getLeftPageBuffer(), 0);

        ((IRTreeFrame) rightFrame).adjustMBR();
        rTreeTupleWriterRightFrame.writeTupleFields(((RTreeNSMFrame) rightFrame).getMBRTuples(), 0,
                rTreeSplitKey.getRightPageBuffer(), 0);
        rTreeSplitKey.getRightTuple().resetByTupleOffset(rTreeSplitKey.getRightPageBuffer(), 0);

        tupleEntries1.clear();
        tupleEntries2.clear();
    }