in hyracks/hyracks-storage-am-rtree/src/main/java/org/apache/hyracks/storage/am/rtree/impls/RTree.java [330:484]
private void insertTuple(ICachedPage node, int pageId, ITupleReference tuple, RTreeOpContext ctx, boolean isLeaf)
throws HyracksDataException, TreeIndexException {
boolean succeeded = false;
FrameOpSpaceStatus spaceStatus;
if (!isLeaf) {
spaceStatus = ctx.interiorFrame.hasSpaceInsert(tuple);
} else {
spaceStatus = ctx.leafFrame.hasSpaceInsert(tuple);
}
switch (spaceStatus) {
case SUFFICIENT_CONTIGUOUS_SPACE: {
try {
if (!isLeaf) {
ctx.interiorFrame.insert(tuple, -1);
} else {
ctx.modificationCallback.found(null, tuple);
ctx.leafFrame.insert(tuple, -1);
}
succeeded = true;
} finally {
if (succeeded) {
ctx.LSNUpdates.add(node);
ctx.splitKey.reset();
} else if (isLeaf) {
// In case of a crash, we un-latch the interior node
// inside updateParentForInsert.
node.releaseWriteLatch(true);
bufferCache.unpin(node);
}
}
break;
}
case SUFFICIENT_SPACE: {
try {
if (!isLeaf) {
ctx.interiorFrame.compact();
ctx.interiorFrame.insert(tuple, -1);
} else {
ctx.leafFrame.compact();
ctx.modificationCallback.found(null, tuple);
ctx.leafFrame.insert(tuple, -1);
}
succeeded = true;
} finally {
if (succeeded) {
ctx.LSNUpdates.add(node);
ctx.splitKey.reset();
} else if (isLeaf) {
// In case of a crash, we un-latch the interior node
// inside updateParentForInsert.
node.releaseWriteLatch(true);
bufferCache.unpin(node);
}
}
break;
}
case INSUFFICIENT_SPACE: {
int rightPageId = freePageManager.getFreePage(ctx.metaFrame);
ICachedPage rightNode = bufferCache.pin(BufferedFileHandle.getDiskPageId(fileId, rightPageId), true);
rightNode.acquireWriteLatch();
try {
IRTreeFrame rightFrame;
if (!isLeaf) {
rightFrame = (IRTreeFrame) interiorFrameFactory.createFrame();
rightFrame.setPage(rightNode);
rightFrame.initBuffer((byte) ctx.interiorFrame.getLevel());
rightFrame.setRightPage(ctx.interiorFrame.getRightPage());
ctx.interiorFrame.split(rightFrame, tuple, ctx.splitKey);
ctx.interiorFrame.setRightPage(rightPageId);
} else {
rightFrame = (IRTreeFrame) leafFrameFactory.createFrame();
rightFrame.setPage(rightNode);
rightFrame.initBuffer((byte) 0);
rightFrame.setRightPage(ctx.interiorFrame.getRightPage());
ctx.modificationCallback.found(null, tuple);
ctx.leafFrame.split(rightFrame, tuple, ctx.splitKey);
ctx.leafFrame.setRightPage(rightPageId);
}
succeeded = true;
} finally {
if (succeeded) {
ctx.NSNUpdates.add(rightNode);
ctx.LSNUpdates.add(rightNode);
ctx.NSNUpdates.add(node);
ctx.LSNUpdates.add(node);
} else if (isLeaf) {
// In case of a crash, we un-latch the interior node
// inside updateParentForInsert.
node.releaseWriteLatch(true);
bufferCache.unpin(node);
rightNode.releaseWriteLatch(true);
bufferCache.unpin(rightNode);
} else {
rightNode.releaseWriteLatch(true);
bufferCache.unpin(rightNode);
}
}
ctx.splitKey.setPages(pageId, rightPageId);
if (pageId == rootPage) {
int newLeftId = freePageManager.getFreePage(ctx.metaFrame);
ICachedPage newLeftNode = bufferCache
.pin(BufferedFileHandle.getDiskPageId(fileId, newLeftId), true);
newLeftNode.acquireWriteLatch();
succeeded = false;
try {
// copy left child to new left child
System.arraycopy(node.getBuffer().array(), 0, newLeftNode.getBuffer().array(), 0, newLeftNode
.getBuffer().capacity());
// initialize new root (leftNode becomes new root)
ctx.interiorFrame.setPage(node);
ctx.interiorFrame.initBuffer((byte) (ctx.interiorFrame.getLevel() + 1));
ctx.splitKey.setLeftPage(newLeftId);
ctx.interiorFrame.insert(ctx.splitKey.getLeftTuple(), -1);
ctx.interiorFrame.insert(ctx.splitKey.getRightTuple(), -1);
succeeded = true;
} finally {
if (succeeded) {
ctx.NSNUpdates.remove(ctx.NSNUpdates.size() - 1);
ctx.LSNUpdates.remove(ctx.LSNUpdates.size() - 1);
ctx.NSNUpdates.add(newLeftNode);
ctx.LSNUpdates.add(newLeftNode);
ctx.NSNUpdates.add(node);
ctx.LSNUpdates.add(node);
ctx.splitKey.reset();
} else if (isLeaf) {
// In case of a crash, we un-latch the interior node
// inside updateParentForInsert.
node.releaseWriteLatch(true);
bufferCache.unpin(node);
rightNode.releaseWriteLatch(true);
bufferCache.unpin(rightNode);
newLeftNode.releaseWriteLatch(true);
bufferCache.unpin(newLeftNode);
} else {
rightNode.releaseWriteLatch(true);
bufferCache.unpin(rightNode);
newLeftNode.releaseWriteLatch(true);
bufferCache.unpin(newLeftNode);
}
}
}
break;
}
}
}