in hyracks-fullstack/hyracks/hyracks-storage-am-btree/src/main/java/org/apache/hyracks/storage/am/btree/impls/BTree.java [563:743]
private void performOp(int pageId, ICachedPage parent, boolean parentIsReadLatched, BTreeOpContext ctx)
throws HyracksDataException {
ICachedPage node = bufferCache.pin(BufferedFileHandle.getDiskPageId(getFileId(), pageId));
ctx.getInteriorFrame().setPage(node);
// this check performs an unprotected read in the page
// the following could happen: TODO fill out
boolean unsafeIsLeaf = ctx.getInteriorFrame().isLeaf();
boolean isReadLatched = acquireLatch(node, ctx, unsafeIsLeaf);
boolean smFlag = ctx.getInteriorFrame().getSmFlag();
// re-check leafness after latching
boolean isLeaf = ctx.getInteriorFrame().isLeaf();
// remember trail of pageLsns, to unwind recursion in case of an ongoing
// structure modification
ctx.getPageLsns().add(ctx.getInteriorFrame().getPageLsn());
try {
// Latch coupling: unlatch parent.
if (parent != null) {
if (parentIsReadLatched) {
parent.releaseReadLatch();
} else {
parent.releaseWriteLatch(true);
}
bufferCache.unpin(parent);
}
if (!isLeaf || smFlag) {
if (!smFlag) {
// We use this loop to deal with possibly multiple operation
// restarts due to ongoing structure modifications during
// the descent.
while (true) {
int childPageId = ctx.getInteriorFrame().getChildPageId(ctx.getPred());
performOp(childPageId, node, isReadLatched, ctx);
node = null;
if (!ctx.getPageLsns().isEmpty()) {
if (ctx.getPageLsns().getLast() == FULL_RESTART_OP) {
break;
} else if (ctx.getPageLsns().getLast() == RESTART_OP) {
// Pop the restart op indicator.
ctx.getPageLsns().removeLast();
node = isConsistent(pageId, ctx);
if (node != null) {
isReadLatched = true;
// Descend the tree again.
if (ctx.getOpRestarts() >= MAX_RESTARTS) {
throw HyracksDataException.create(ErrorCode.OPERATION_EXCEEDED_MAX_RESTARTS,
MAX_RESTARTS);
}
continue;
} else {
// Pop pageLsn of this page (version seen by this op during descent).
ctx.getPageLsns().removeLast();
// This node is not consistent set the restart indicator for upper level.
ctx.getPageLsns().add(RESTART_OP);
break;
}
}
}
switch (ctx.getOperation()) {
case INSERT:
case UPSERT:
case UPDATE: {
// Is there a propagated split key?
if (ctx.getSplitKey().getBuffer() != null) {
ICachedPage interiorNode =
bufferCache.pin(BufferedFileHandle.getDiskPageId(getFileId(), pageId));
interiorNode.acquireWriteLatch();
try {
// Insert or update op. Both can cause split keys to propagate upwards.
insertInterior(interiorNode, pageId, ctx.getSplitKey().getTuple(), ctx);
} finally {
interiorNode.releaseWriteLatch(true);
bufferCache.unpin(interiorNode);
}
} else {
unsetSmPages(ctx);
}
break;
}
case DELETE: {
if (ctx.getSplitKey().getBuffer() != null) {
throw new HyracksDataException(
"Split key was propagated during delete. Delete allows empty leaf pages.");
}
break;
}
default: {
// Do nothing for Search and DiskOrderScan.
break;
}
}
// Operation completed.
break;
} // end while
} else { // smFlag
ctx.setOpRestarts(ctx.getOpRestarts() + 1);
if (isReadLatched) {
node.releaseReadLatch();
} else {
node.releaseWriteLatch(true);
}
bufferCache.unpin(node);
// TODO: this should be an instant duration lock, how to do
// this in java?
// instead we just immediately release the lock. this is
// inefficient but still correct and will not cause
// latch-deadlock
treeLatch.readLock().lock();
treeLatch.readLock().unlock();
// unwind recursion and restart operation, find lowest page
// with a pageLsn as seen by this operation during descent
ctx.getPageLsns().removeLast(); // pop current page lsn
// put special value on the stack to inform caller of
// restart
ctx.getPageLsns().add(RESTART_OP);
}
} else { // isLeaf and !smFlag
// We may have to restart an op to avoid latch deadlock.
boolean restartOp = false;
ctx.getLeafFrame().setPage(node);
switch (ctx.getOperation()) {
case INSERT: {
int targetTupleIndex = ctx.getLeafFrame().findInsertTupleIndex(ctx.getPred().getLowKey());
restartOp = insertLeaf(ctx.getPred().getLowKey(), targetTupleIndex, pageId, ctx);
break;
}
case UPSERT: {
int targetTupleIndex = ctx.getLeafFrame().findUpsertTupleIndex(ctx.getPred().getLowKey());
restartOp = upsertLeaf(ctx.getPred().getLowKey(), targetTupleIndex, pageId, ctx);
break;
}
case UPDATE: {
int oldTupleIndex = ctx.getLeafFrame().findUpdateTupleIndex(ctx.getPred().getLowKey());
restartOp = updateLeaf(ctx.getPred().getLowKey(), oldTupleIndex, pageId, ctx);
break;
}
case DELETE: {
restartOp = deleteLeaf(node, pageId, ctx.getPred().getLowKey(), ctx);
break;
}
case SEARCH: {
ctx.getCursorInitialState().setSearchOperationCallback(ctx.getSearchCallback());
ctx.getCursorInitialState().setOriginialKeyComparator(ctx.getCmp());
ctx.getCursorInitialState().setPage(node);
ctx.getCursorInitialState().setPageId(pageId);
ctx.getCursor().open(ctx.getCursorInitialState(), ctx.getPred());
break;
}
}
if (ctx.getOperation() != IndexOperation.SEARCH) {
node.releaseWriteLatch(true);
bufferCache.unpin(node);
}
if (restartOp) {
// Wait for the SMO to persistFrontiers before restarting.
// We didn't release the pin on the page!!
treeLatch.readLock().lock();
treeLatch.readLock().unlock();
ctx.getPageLsns().removeLast();
ctx.getPageLsns().add(FULL_RESTART_OP);
}
}
} catch (Throwable e) {
if (!ctx.isExceptionHandled() && node != null) {
if (isReadLatched) {
node.releaseReadLatch();
} else {
node.releaseWriteLatch(true);
}
bufferCache.unpin(node);
}
ctx.setExceptionHandled(true);
throw HyracksDataException.create(e);
}
}