private void insertTuple()

in hyracks/hyracks-storage-am-rtree/src/main/java/org/apache/hyracks/storage/am/rtree/impls/RTree.java [330:484]


    private void insertTuple(ICachedPage node, int pageId, ITupleReference tuple, RTreeOpContext ctx, boolean isLeaf)
            throws HyracksDataException, TreeIndexException {
        boolean succeeded = false;
        FrameOpSpaceStatus spaceStatus;
        if (!isLeaf) {
            spaceStatus = ctx.interiorFrame.hasSpaceInsert(tuple);
        } else {
            spaceStatus = ctx.leafFrame.hasSpaceInsert(tuple);
        }

        switch (spaceStatus) {
            case SUFFICIENT_CONTIGUOUS_SPACE: {
                try {
                    if (!isLeaf) {
                        ctx.interiorFrame.insert(tuple, -1);
                    } else {
                        ctx.modificationCallback.found(null, tuple);
                        ctx.leafFrame.insert(tuple, -1);
                    }
                    succeeded = true;
                } finally {
                    if (succeeded) {
                        ctx.LSNUpdates.add(node);
                        ctx.splitKey.reset();
                    } else if (isLeaf) {
                        // In case of a crash, we un-latch the interior node
                        // inside updateParentForInsert.
                        node.releaseWriteLatch(true);
                        bufferCache.unpin(node);
                    }
                }
                break;
            }

            case SUFFICIENT_SPACE: {
                try {
                    if (!isLeaf) {
                        ctx.interiorFrame.compact();
                        ctx.interiorFrame.insert(tuple, -1);
                    } else {
                        ctx.leafFrame.compact();
                        ctx.modificationCallback.found(null, tuple);
                        ctx.leafFrame.insert(tuple, -1);
                    }
                    succeeded = true;
                } finally {
                    if (succeeded) {
                        ctx.LSNUpdates.add(node);
                        ctx.splitKey.reset();
                    } else if (isLeaf) {
                        // In case of a crash, we un-latch the interior node
                        // inside updateParentForInsert.
                        node.releaseWriteLatch(true);
                        bufferCache.unpin(node);
                    }
                }
                break;
            }

            case INSUFFICIENT_SPACE: {
                int rightPageId = freePageManager.getFreePage(ctx.metaFrame);
                ICachedPage rightNode = bufferCache.pin(BufferedFileHandle.getDiskPageId(fileId, rightPageId), true);
                rightNode.acquireWriteLatch();

                try {
                    IRTreeFrame rightFrame;
                    if (!isLeaf) {
                        rightFrame = (IRTreeFrame) interiorFrameFactory.createFrame();
                        rightFrame.setPage(rightNode);
                        rightFrame.initBuffer((byte) ctx.interiorFrame.getLevel());
                        rightFrame.setRightPage(ctx.interiorFrame.getRightPage());
                        ctx.interiorFrame.split(rightFrame, tuple, ctx.splitKey);
                        ctx.interiorFrame.setRightPage(rightPageId);
                    } else {
                        rightFrame = (IRTreeFrame) leafFrameFactory.createFrame();
                        rightFrame.setPage(rightNode);
                        rightFrame.initBuffer((byte) 0);
                        rightFrame.setRightPage(ctx.interiorFrame.getRightPage());
                        ctx.modificationCallback.found(null, tuple);
                        ctx.leafFrame.split(rightFrame, tuple, ctx.splitKey);
                        ctx.leafFrame.setRightPage(rightPageId);
                    }
                    succeeded = true;
                } finally {
                    if (succeeded) {
                        ctx.NSNUpdates.add(rightNode);
                        ctx.LSNUpdates.add(rightNode);
                        ctx.NSNUpdates.add(node);
                        ctx.LSNUpdates.add(node);
                    } else if (isLeaf) {
                        // In case of a crash, we un-latch the interior node
                        // inside updateParentForInsert.
                        node.releaseWriteLatch(true);
                        bufferCache.unpin(node);
                        rightNode.releaseWriteLatch(true);
                        bufferCache.unpin(rightNode);
                    } else {
                        rightNode.releaseWriteLatch(true);
                        bufferCache.unpin(rightNode);
                    }

                }
                ctx.splitKey.setPages(pageId, rightPageId);
                if (pageId == rootPage) {
                    int newLeftId = freePageManager.getFreePage(ctx.metaFrame);
                    ICachedPage newLeftNode = bufferCache
                            .pin(BufferedFileHandle.getDiskPageId(fileId, newLeftId), true);
                    newLeftNode.acquireWriteLatch();
                    succeeded = false;
                    try {
                        // copy left child to new left child
                        System.arraycopy(node.getBuffer().array(), 0, newLeftNode.getBuffer().array(), 0, newLeftNode
                                .getBuffer().capacity());

                        // initialize new root (leftNode becomes new root)
                        ctx.interiorFrame.setPage(node);
                        ctx.interiorFrame.initBuffer((byte) (ctx.interiorFrame.getLevel() + 1));

                        ctx.splitKey.setLeftPage(newLeftId);
                        ctx.interiorFrame.insert(ctx.splitKey.getLeftTuple(), -1);
                        ctx.interiorFrame.insert(ctx.splitKey.getRightTuple(), -1);

                        succeeded = true;
                    } finally {
                        if (succeeded) {
                            ctx.NSNUpdates.remove(ctx.NSNUpdates.size() - 1);
                            ctx.LSNUpdates.remove(ctx.LSNUpdates.size() - 1);

                            ctx.NSNUpdates.add(newLeftNode);
                            ctx.LSNUpdates.add(newLeftNode);

                            ctx.NSNUpdates.add(node);
                            ctx.LSNUpdates.add(node);
                            ctx.splitKey.reset();
                        } else if (isLeaf) {
                            // In case of a crash, we un-latch the interior node
                            // inside updateParentForInsert.
                            node.releaseWriteLatch(true);
                            bufferCache.unpin(node);
                            rightNode.releaseWriteLatch(true);
                            bufferCache.unpin(rightNode);
                            newLeftNode.releaseWriteLatch(true);
                            bufferCache.unpin(newLeftNode);
                        } else {
                            rightNode.releaseWriteLatch(true);
                            bufferCache.unpin(rightNode);
                            newLeftNode.releaseWriteLatch(true);
                            bufferCache.unpin(newLeftNode);
                        }
                    }
                }
                break;
            }
        }
    }