private void performOp()

in hyracks-fullstack/hyracks/hyracks-storage-am-btree/src/main/java/org/apache/hyracks/storage/am/btree/impls/BTree.java [563:743]


    private void performOp(int pageId, ICachedPage parent, boolean parentIsReadLatched, BTreeOpContext ctx)
            throws HyracksDataException {
        ICachedPage node = bufferCache.pin(BufferedFileHandle.getDiskPageId(getFileId(), pageId));
        ctx.getInteriorFrame().setPage(node);
        // this check performs an unprotected read in the page
        // the following could happen: TODO fill out
        boolean unsafeIsLeaf = ctx.getInteriorFrame().isLeaf();
        boolean isReadLatched = acquireLatch(node, ctx, unsafeIsLeaf);
        boolean smFlag = ctx.getInteriorFrame().getSmFlag();
        // re-check leafness after latching
        boolean isLeaf = ctx.getInteriorFrame().isLeaf();

        // remember trail of pageLsns, to unwind recursion in case of an ongoing
        // structure modification
        ctx.getPageLsns().add(ctx.getInteriorFrame().getPageLsn());
        try {
            // Latch coupling: unlatch parent.
            if (parent != null) {
                if (parentIsReadLatched) {
                    parent.releaseReadLatch();
                } else {
                    parent.releaseWriteLatch(true);
                }
                bufferCache.unpin(parent);
            }
            if (!isLeaf || smFlag) {
                if (!smFlag) {
                    // We use this loop to deal with possibly multiple operation
                    // restarts due to ongoing structure modifications during
                    // the descent.
                    while (true) {
                        int childPageId = ctx.getInteriorFrame().getChildPageId(ctx.getPred());
                        performOp(childPageId, node, isReadLatched, ctx);
                        node = null;

                        if (!ctx.getPageLsns().isEmpty()) {
                            if (ctx.getPageLsns().getLast() == FULL_RESTART_OP) {
                                break;
                            } else if (ctx.getPageLsns().getLast() == RESTART_OP) {
                                // Pop the restart op indicator.
                                ctx.getPageLsns().removeLast();
                                node = isConsistent(pageId, ctx);
                                if (node != null) {
                                    isReadLatched = true;
                                    // Descend the tree again.
                                    if (ctx.getOpRestarts() >= MAX_RESTARTS) {
                                        throw HyracksDataException.create(ErrorCode.OPERATION_EXCEEDED_MAX_RESTARTS,
                                                MAX_RESTARTS);
                                    }
                                    continue;
                                } else {
                                    // Pop pageLsn of this page (version seen by this op during descent).
                                    ctx.getPageLsns().removeLast();
                                    // This node is not consistent set the restart indicator for upper level.
                                    ctx.getPageLsns().add(RESTART_OP);
                                    break;
                                }
                            }
                        }

                        switch (ctx.getOperation()) {
                            case INSERT:
                            case UPSERT:
                            case UPDATE: {
                                // Is there a propagated split key?
                                if (ctx.getSplitKey().getBuffer() != null) {
                                    ICachedPage interiorNode =
                                            bufferCache.pin(BufferedFileHandle.getDiskPageId(getFileId(), pageId));
                                    interiorNode.acquireWriteLatch();
                                    try {
                                        // Insert or update op. Both can cause split keys to propagate upwards.
                                        insertInterior(interiorNode, pageId, ctx.getSplitKey().getTuple(), ctx);
                                    } finally {
                                        interiorNode.releaseWriteLatch(true);
                                        bufferCache.unpin(interiorNode);
                                    }
                                } else {
                                    unsetSmPages(ctx);
                                }
                                break;
                            }

                            case DELETE: {
                                if (ctx.getSplitKey().getBuffer() != null) {
                                    throw new HyracksDataException(
                                            "Split key was propagated during delete. Delete allows empty leaf pages.");
                                }
                                break;
                            }

                            default: {
                                // Do nothing for Search and DiskOrderScan.
                                break;
                            }
                        }
                        // Operation completed.
                        break;
                    } // end while
                } else { // smFlag
                    ctx.setOpRestarts(ctx.getOpRestarts() + 1);
                    if (isReadLatched) {
                        node.releaseReadLatch();
                    } else {
                        node.releaseWriteLatch(true);
                    }
                    bufferCache.unpin(node);

                    // TODO: this should be an instant duration lock, how to do
                    // this in java?
                    // instead we just immediately release the lock. this is
                    // inefficient but still correct and will not cause
                    // latch-deadlock
                    treeLatch.readLock().lock();
                    treeLatch.readLock().unlock();

                    // unwind recursion and restart operation, find lowest page
                    // with a pageLsn as seen by this operation during descent
                    ctx.getPageLsns().removeLast(); // pop current page lsn
                    // put special value on the stack to inform caller of
                    // restart
                    ctx.getPageLsns().add(RESTART_OP);
                }
            } else { // isLeaf and !smFlag
                // We may have to restart an op to avoid latch deadlock.
                boolean restartOp = false;
                ctx.getLeafFrame().setPage(node);
                switch (ctx.getOperation()) {
                    case INSERT: {
                        int targetTupleIndex = ctx.getLeafFrame().findInsertTupleIndex(ctx.getPred().getLowKey());
                        restartOp = insertLeaf(ctx.getPred().getLowKey(), targetTupleIndex, pageId, ctx);
                        break;
                    }
                    case UPSERT: {
                        int targetTupleIndex = ctx.getLeafFrame().findUpsertTupleIndex(ctx.getPred().getLowKey());
                        restartOp = upsertLeaf(ctx.getPred().getLowKey(), targetTupleIndex, pageId, ctx);
                        break;
                    }
                    case UPDATE: {
                        int oldTupleIndex = ctx.getLeafFrame().findUpdateTupleIndex(ctx.getPred().getLowKey());
                        restartOp = updateLeaf(ctx.getPred().getLowKey(), oldTupleIndex, pageId, ctx);
                        break;
                    }
                    case DELETE: {
                        restartOp = deleteLeaf(node, pageId, ctx.getPred().getLowKey(), ctx);
                        break;
                    }
                    case SEARCH: {
                        ctx.getCursorInitialState().setSearchOperationCallback(ctx.getSearchCallback());
                        ctx.getCursorInitialState().setOriginialKeyComparator(ctx.getCmp());
                        ctx.getCursorInitialState().setPage(node);
                        ctx.getCursorInitialState().setPageId(pageId);
                        ctx.getCursor().open(ctx.getCursorInitialState(), ctx.getPred());
                        break;
                    }
                }
                if (ctx.getOperation() != IndexOperation.SEARCH) {
                    node.releaseWriteLatch(true);
                    bufferCache.unpin(node);
                }
                if (restartOp) {
                    // Wait for the SMO to persistFrontiers before restarting.
                    // We didn't release the pin on the page!!
                    treeLatch.readLock().lock();
                    treeLatch.readLock().unlock();
                    ctx.getPageLsns().removeLast();
                    ctx.getPageLsns().add(FULL_RESTART_OP);
                }
            }
        } catch (Throwable e) {
            if (!ctx.isExceptionHandled() && node != null) {
                if (isReadLatched) {
                    node.releaseReadLatch();
                } else {
                    node.releaseWriteLatch(true);
                }
                bufferCache.unpin(node);
            }
            ctx.setExceptionHandled(true);
            throw HyracksDataException.create(e);
        }
    }