ACTOR static Future commitSubtree()

in fdbserver/VersionedBTree.actor.cpp [6141:6866]


	ACTOR static Future<Void> commitSubtree(
	    VersionedBTree* self,
	    CommitBatch* batch,
	    BTreePageIDRef rootID,
	    unsigned int height,
	    MutationBuffer::const_iterator mBegin, // greatest mutation boundary <= subtreeLowerBound->key
	    MutationBuffer::const_iterator mEnd, // least boundary >= subtreeUpperBound->key
	    InternalPageSliceUpdate* update) {

		state std::string context;
		if (REDWOOD_DEBUG) {
			context = format("CommitSubtree(root=%s): ", toString(rootID).c_str());
		}
		debug_printf("%s %s\n", context.c_str(), update->toString().c_str());
		if (REDWOOD_DEBUG) {
			debug_printf("%s ---------MUTATION BUFFER SLICE ---------------------\n", context.c_str());
			auto begin = mBegin;
			while (1) {
				debug_printf("%s Mutation: '%s':  %s\n",
				             context.c_str(),
				             printable(begin.key()).c_str(),
				             begin.mutation().toString().c_str());
				if (begin == mEnd) {
					break;
				}
				++begin;
			}
			debug_printf("%s -------------------------------------\n", context.c_str());
		}

		state Reference<const ArenaPage> page =
		    wait(readPage(PagerEventReasons::Commit, height, batch->snapshot, rootID, height, false, true));

		// If the page exists in the cache, it must be copied before modification.
		// That copy will be referenced by pageCopy, as page must stay in scope in case anything references its
		// memory and it gets evicted from the cache.
		// If the page is not in the cache, then no copy is needed so we will initialize pageCopy to page
		state Reference<const ArenaPage> pageCopy;

		state BTreePage* btPage = (BTreePage*)page->begin();
		ASSERT(height == btPage->height);
		++g_redwoodMetrics.level(height).metrics.pageCommitStart;

		// TODO:  Decide if it is okay to update if the subtree boundaries are expanded.  It can result in
		// records in a DeltaTree being outside its decode boundary range, which isn't actually invalid
		// though it is awkward to reason about.
		// TryToUpdate indicates insert and erase operations should be tried on the existing page first
		state bool tryToUpdate = btPage->tree()->numItems > 0 && update->boundariesNormal();

		debug_printf(
		    "%s commitSubtree(): %s\n",
		    context.c_str(),
		    btPage
		        ->toString(
		            false, rootID, batch->snapshot->getVersion(), update->decodeLowerBound, update->decodeUpperBound)
		        .c_str());

		state BTreePage::BinaryTree::Cursor cursor =
		    update->cBegin.valid() ? getCursor(page, update->cBegin) : getCursor(page, dbBegin, dbEnd);

		if (REDWOOD_DEBUG) {
			debug_printf("%s ---------MUTATION BUFFER SLICE ---------------------\n", context.c_str());
			auto begin = mBegin;
			while (1) {
				debug_printf("%s Mutation: '%s':  %s\n",
				             context.c_str(),
				             printable(begin.key()).c_str(),
				             begin.mutation().toString().c_str());
				if (begin == mEnd) {
					break;
				}
				++begin;
			}
			debug_printf("%s -------------------------------------\n", context.c_str());
		}

		// Leaf Page
		if (btPage->isLeaf()) {
			// When true, we are modifying the existing DeltaTree
			// When false, we are accumulating retained and added records in merged vector to build pages from them.
			bool updatingDeltaTree = tryToUpdate;
			bool changesMade = false;

			// Copy page for modification if not already copied
			auto copyForUpdate = [&]() {
				if (!pageCopy.isValid()) {
					pageCopy = clonePageForUpdate(page);
					btPage = (BTreePage*)pageCopy->begin();
					cursor.switchTree(btPage->tree());
				}
			};

			state Standalone<VectorRef<RedwoodRecordRef>> merged;

			// The first mutation buffer boundary has a key <= the first key in the page.

			cursor.moveFirst();
			debug_printf("%s Leaf page, applying changes.\n", context.c_str());

			// Now, process each mutation range and merge changes with existing data.
			bool firstMutationBoundary = true;
			constexpr int maxHeightAllowed = 8;

			while (mBegin != mEnd) {
				// Apply the change to the mutation buffer start boundary key only if
				//   - there actually is a change (clear or set to new value)
				//   - either this is not the first boundary or it is but its key matches our lower bound key
				bool applyBoundaryChange = mBegin.mutation().boundaryChanged &&
				                           (!firstMutationBoundary || mBegin.key() == update->subtreeLowerBound.key);
				bool boundaryExists = cursor.valid() && cursor.get().key == mBegin.key();

				debug_printf("%s New mutation boundary: '%s': %s  applyBoundaryChange=%d  boundaryExists=%d "
				             "updatingDeltaTree=%d\n",
				             context.c_str(),
				             printable(mBegin.key()).c_str(),
				             mBegin.mutation().toString().c_str(),
				             applyBoundaryChange,
				             boundaryExists,
				             updatingDeltaTree);

				firstMutationBoundary = false;

				if (applyBoundaryChange) {
					// If the boundary is being set to a value, the new KV record will be inserted
					bool shouldInsertBoundary = mBegin.mutation().boundarySet();

					// Optimization:  In-place value update of new same-sized value
					// If the boundary exists in the page and we're in update mode and the boundary is being set to a
					// new value of the same length as the old value then just update the value bytes.
					if (boundaryExists && updatingDeltaTree && shouldInsertBoundary &&
					    mBegin.mutation().boundaryValue.get().size() == cursor.get().value.get().size()) {
						changesMade = true;
						shouldInsertBoundary = false;

						debug_printf("%s In-place value update for %s [existing, boundary start]\n",
						             context.c_str(),
						             cursor.get().toString().c_str());

						copyForUpdate();
						memcpy((uint8_t*)cursor.get().value.get().begin(),
						       mBegin.mutation().boundaryValue.get().begin(),
						       cursor.get().value.get().size());
						cursor.moveNext();
					} else if (boundaryExists) {
						// An in place update can't be done, so if the boundary exists then erase or skip the record
						changesMade = true;

						// If updating, erase from the page, otherwise do not add to the output set
						if (updatingDeltaTree) {
							debug_printf("%s Erasing %s [existing, boundary start]\n",
							             context.c_str(),
							             cursor.get().toString().c_str());

							copyForUpdate();
							btPage->kvBytes -= cursor.get().kvBytes();
							cursor.erase();
						} else {
							debug_printf("%s Skipped %s [existing, boundary start]\n",
							             context.c_str(),
							             cursor.get().toString().c_str());
							cursor.moveNext();
						}
					}

					// If the boundary value is being set and we must insert it, add it to the page or the output set
					if (shouldInsertBoundary) {
						RedwoodRecordRef rec(mBegin.key(), mBegin.mutation().boundaryValue.get());
						changesMade = true;

						// If updating, first try to add the record to the page
						if (updatingDeltaTree) {
							copyForUpdate();
							if (cursor.insert(rec, update->skipLen, maxHeightAllowed)) {
								btPage->kvBytes += rec.kvBytes();
								debug_printf("%s Inserted %s [mutation, boundary start]\n",
								             context.c_str(),
								             rec.toString().c_str());
							} else {
								debug_printf("%s Insert failed for %s [mutation, boundary start]\n",
								             context.c_str(),
								             rec.toString().c_str());

								// Since the insert failed we must switch to a linear merge of existing data and
								// mutations, accumulating the new record set in the merge vector and build new pages
								// from it. First, we must populate the merged vector with all the records up to but not
								// including the current mutation boundary key.
								auto c = cursor;
								c.moveFirst();
								while (c != cursor) {
									debug_printf(
									    "%s catch-up adding %s\n", context.c_str(), c.get().toString().c_str());
									merged.push_back(merged.arena(), c.get());
									c.moveNext();
								}
								updatingDeltaTree = false;
							}
						}

						// If not updating, possibly due to insert failure above, then add record to the output set
						if (!updatingDeltaTree) {
							merged.push_back(merged.arena(), rec);
							debug_printf(
							    "%s Added %s [mutation, boundary start]\n", context.c_str(), rec.toString().c_str());
						}
					}
				} else if (boundaryExists) {
					// If the boundary exists in the page but there is no pending change,
					// then if updating move past it, otherwise add it to the output set.
					if (updatingDeltaTree) {
						cursor.moveNext();
					} else {
						merged.push_back(merged.arena(), cursor.get());
						debug_printf("%s Added %s [existing, boundary start]\n",
						             context.c_str(),
						             cursor.get().toString().c_str());
						cursor.moveNext();
					}
				}

				// Before advancing the iterator, get whether or not the records in the following range must be removed
				bool remove = mBegin.mutation().clearAfterBoundary;
				// Advance to the next boundary because we need to know the end key for the current range.
				++mBegin;
				if (mBegin == mEnd) {
					update->skipLen = 0;
				}

				debug_printf("%s Mutation range end: '%s'\n", context.c_str(), printable(mBegin.key()).c_str());

				// Now handle the records up through but not including the next mutation boundary key
				RedwoodRecordRef end(mBegin.key());

				// If the records are being removed and we're not doing an in-place update
				// OR if we ARE doing an update but the records are NOT being removed, then just skip them.
				if (remove != updatingDeltaTree) {
					// If not updating, then the records, if any exist, are being removed.  We don't know if there
					// actually are any but we must assume there are.
					if (!updatingDeltaTree) {
						changesMade = true;
					}

					debug_printf("%s Seeking forward to next boundary (remove=%d updating=%d) %s\n",
					             context.c_str(),
					             remove,
					             updatingDeltaTree,
					             mBegin.key().toString().c_str());
					cursor.seekGreaterThanOrEqual(end, update->skipLen);
				} else {
					// Otherwise we must visit the records.  If updating, the visit is to erase them, and if doing a
					// linear merge than the visit is to add them to the output set.
					while (cursor.valid() && cursor.get().compare(end, update->skipLen) < 0) {
						if (updatingDeltaTree) {
							debug_printf("%s Erasing %s [existing, boundary start]\n",
							             context.c_str(),
							             cursor.get().toString().c_str());

							copyForUpdate();
							btPage->kvBytes -= cursor.get().kvBytes();
							cursor.erase();
							changesMade = true;
						} else {
							merged.push_back(merged.arena(), cursor.get());
							debug_printf(
							    "%s Added %s [existing, middle]\n", context.c_str(), merged.back().toString().c_str());
							cursor.moveNext();
						}
					}
				}
			}

			// If there are still more records, they have the same key as the end boundary
			if (cursor.valid()) {
				// If the end boundary is changing, we must remove the remaining records in this page
				bool remove = mEnd.mutation().boundaryChanged;
				if (remove) {
					changesMade = true;
				}

				// If we don't have to remove the records and we are updating, do nothing.
				// If we do have to remove the records and we are not updating, do nothing.
				if (remove != updatingDeltaTree) {
					debug_printf("%s Ignoring remaining records, remove=%d updating=%d\n",
					             context.c_str(),
					             remove,
					             updatingDeltaTree);
				} else {
					// If updating and the key is changing, we must visit the records to erase them.
					// If not updating and the key is not changing, we must visit the records to add them to the output
					// set.
					while (cursor.valid()) {
						if (updatingDeltaTree) {
							debug_printf(
							    "%s Erasing %s and beyond [existing, matches changed upper mutation boundary]\n",
							    context.c_str(),
							    cursor.get().toString().c_str());

							copyForUpdate();
							btPage->kvBytes -= cursor.get().kvBytes();
							cursor.erase();
						} else {
							merged.push_back(merged.arena(), cursor.get());
							debug_printf(
							    "%s Added %s [existing, tail]\n", context.c_str(), merged.back().toString().c_str());
							cursor.moveNext();
						}
					}
				}
			} else {
				debug_printf("%s No records matching mutation buffer end boundary key\n", context.c_str());
			}

			// No changes were actually made.  This could happen if the only mutations are clear ranges which do not
			// match any records.
			if (!changesMade) {
				debug_printf("%s No changes were made during mutation merge, returning %s\n",
				             context.c_str(),
				             toString(*update).c_str());
				return Void();
			} else {
				debug_printf(
				    "%s Changes were made, writing, but subtree may still be unchanged from parent's perspective.\n",
				    context.c_str());
			}

			if (updatingDeltaTree) {
				// If the tree is now empty, delete the page
				if (cursor.tree->numItems == 0) {
					update->cleared();
					self->freeBTreePage(rootID, batch->writeVersion);
					debug_printf("%s Page updates cleared all entries, returning %s\n",
					             context.c_str(),
					             toString(*update).c_str());
				} else {
					// Otherwise update it.
					BTreePageIDRef newID = wait(self->updateBTreePage(
					    self, rootID, &update->newLinks.arena(), pageCopy.castTo<ArenaPage>(), batch->writeVersion));

					update->updatedInPlace(newID, btPage, newID.size() * self->m_blockSize);
					debug_printf(
					    "%s Page updated in-place, returning %s\n", context.c_str(), toString(*update).c_str());
				}
				return Void();
			}

			// If everything in the page was deleted then this page should be deleted as of the new version
			if (merged.empty()) {
				update->cleared();
				self->freeBTreePage(rootID, batch->writeVersion);

				debug_printf("%s All leaf page contents were cleared, returning %s\n",
				             context.c_str(),
				             toString(*update).c_str());
				return Void();
			}

			// Rebuild new page(s).
			state Standalone<VectorRef<RedwoodRecordRef>> entries = wait(writePages(self,
			                                                                        &update->subtreeLowerBound,
			                                                                        &update->subtreeUpperBound,
			                                                                        merged,
			                                                                        height,
			                                                                        batch->writeVersion,
			                                                                        rootID));

			// Put new links into update and tell update that pages were rebuilt
			update->rebuilt(entries);

			debug_printf("%s Merge complete, returning %s\n", context.c_str(), toString(*update).c_str());
			return Void();
		} else {
			// Internal Page
			std::vector<Future<Void>> recursions;
			state std::vector<std::unique_ptr<InternalPageSliceUpdate>> slices;

			cursor.moveFirst();

			bool first = true;

			while (cursor.valid()) {
				slices.emplace_back(new InternalPageSliceUpdate());
				InternalPageSliceUpdate& u = *slices.back();

				// At this point we should never be at a null child page entry because the first entry of a page
				// can't be null and this loop will skip over null entries that come after non-null entries.
				ASSERT(cursor.get().value.present());

				// Subtree lower boundary is this page's subtree lower bound or cursor
				u.cBegin = cursor;
				u.decodeLowerBound = cursor.get();
				if (first) {
					u.subtreeLowerBound = update->subtreeLowerBound;
					first = false;
					// mbegin is already the first mutation that could affect this subtree described by update
				} else {
					u.subtreeLowerBound = u.decodeLowerBound;
					mBegin = mEnd;
					// mBegin is either at or greater than subtreeLowerBound->key, which was the subtreeUpperBound->key
					// for the previous subtree slice.  But we need it to be at or *before* subtreeLowerBound->key
					// so if mBegin.key() is not exactly the subtree lower bound key then decrement it.
					if (mBegin.key() != u.subtreeLowerBound.key) {
						--mBegin;
					}
				}

				BTreePageIDRef pageID = cursor.get().getChildPage();
				ASSERT(!pageID.empty());

				// The decode upper bound is always the next key after the child link, or the decode upper bound for
				// this page
				if (cursor.moveNext()) {
					u.decodeUpperBound = cursor.get();
					// If cursor record has a null child page then it exists only to preserve a previous
					// subtree boundary that is now needed for reading the subtree at cBegin.
					if (!cursor.get().value.present()) {
						// If the upper bound is provided by a dummy record in [cBegin, cEnd) then there is no
						// requirement on the next subtree range or the parent page to have a specific upper boundary
						// for decoding the subtree.
						u.expectedUpperBound.reset();
						cursor.moveNext();
						// If there is another record after the null child record, it must have a child page value
						ASSERT(!cursor.valid() || cursor.get().value.present());
					} else {
						u.expectedUpperBound = u.decodeUpperBound;
					}
				} else {
					u.decodeUpperBound = update->decodeUpperBound;
					u.expectedUpperBound = update->decodeUpperBound;
				}
				u.subtreeUpperBound = cursor.valid() ? cursor.get() : update->subtreeUpperBound;
				u.cEnd = cursor;
				u.skipLen = 0; // TODO: set this

				// Find the mutation buffer range that includes all changes to the range described by u
				mEnd = batch->mutations->lower_bound(u.subtreeUpperBound.key);

				// If the mutation range described by mBegin extends to mEnd, then see if the part of that range
				// that overlaps with u's subtree range is being fully cleared or fully unchanged.
				auto next = mBegin;
				++next;
				if (next == mEnd) {
					// Check for uniform clearedness or unchangedness for the range mutation where it overlaps u's
					// subtree
					const KeyRef& mutationBoundaryKey = mBegin.key();
					const RangeMutation& range = mBegin.mutation();
					bool uniform;
					if (range.clearAfterBoundary) {
						// If the mutation range after the boundary key is cleared, then the mutation boundary key must
						// be cleared or must be different than the subtree lower bound key so that it doesn't matter
						uniform = range.boundaryCleared() || mutationBoundaryKey != u.subtreeLowerBound.key;
					} else {
						// If the mutation range after the boundary key is unchanged, then the mutation boundary key
						// must be also unchanged or must be different than the subtree lower bound key so that it
						// doesn't matter
						uniform = !range.boundaryChanged || mutationBoundaryKey != u.subtreeLowerBound.key;
					}

					// If u's subtree is either all cleared or all unchanged
					if (uniform) {
						// We do not need to recurse to this subtree.  Next, let's see if we can embiggen u's range to
						// include sibling subtrees also covered by (mBegin, mEnd) so we can not recurse to those, too.
						// If the cursor is valid, u.subtreeUpperBound is the cursor's position, which is >= mEnd.key().
						// If equal, no range expansion is possible.
						if (cursor.valid() && mEnd.key() != u.subtreeUpperBound.key) {
							// TODO:  If cursor hints are available, use (cursor, 1)
							cursor.seekLessThanOrEqual(mEnd.key(), update->skipLen);

							// If this seek moved us ahead, to something other than cEnd, then update subtree range
							// boundaries
							if (cursor != u.cEnd) {
								// If the cursor is at a record with a null child, back up one step because it is in the
								// middle of the next logical subtree, as null child records are not subtree boundaries.
								ASSERT(cursor.valid());
								if (!cursor.get().value.present()) {
									cursor.movePrev();
								}

								u.cEnd = cursor;
								u.subtreeUpperBound = cursor.get();
								u.skipLen = 0; // TODO: set this

								// The new decode upper bound is either cEnd or the record before it if it has no child
								// link
								auto c = u.cEnd;
								c.movePrev();
								ASSERT(c.valid());
								if (!c.get().value.present()) {
									u.decodeUpperBound = c.get();
									u.expectedUpperBound.reset();
								} else {
									u.decodeUpperBound = u.subtreeUpperBound;
									u.expectedUpperBound = u.subtreeUpperBound;
								}
							}
						}

						// The subtree range is either fully cleared or unchanged.
						if (range.clearAfterBoundary) {
							// Cleared
							u.cleared();
							auto c = u.cBegin;
							while (c != u.cEnd) {
								RedwoodRecordRef rec = c.get();
								if (rec.value.present()) {
									if (height == 2) {
										debug_printf("%s: freeing child page in cleared subtree range: %s\n",
										             context.c_str(),
										             ::toString(rec.getChildPage()).c_str());
										self->freeBTreePage(rec.getChildPage(), batch->writeVersion);
									} else {
										debug_printf("%s: queuing subtree deletion cleared subtree range: %s\n",
										             context.c_str(),
										             ::toString(rec.getChildPage()).c_str());
										self->m_lazyClearQueue.pushBack(LazyClearQueueEntry{
										    (uint8_t)(height - 1), batch->writeVersion, rec.getChildPage() });
									}
								}
								c.moveNext();
							}
						} else {
							// Subtree range unchanged
						}

						debug_printf("%s: MutationBuffer covers this range in a single mutation, not recursing: %s\n",
						             context.c_str(),
						             u.toString().c_str());

						// u has already been initialized with the correct result, no recursion needed, so restart the
						// loop.
						continue;
					}
				}

				// If this page has height of 2 then its children are leaf nodes
				recursions.push_back(self->commitSubtree(self, batch, pageID, height - 1, mBegin, mEnd, &u));
			}

			debug_printf(
			    "%s Recursions from internal page started. pageSize=%d level=%d children=%d slices=%d recursions=%d\n",
			    context.c_str(),
			    btPage->size(),
			    btPage->height,
			    btPage->tree()->numItems,
			    slices.size(),
			    recursions.size());

			wait(waitForAll(recursions));
			debug_printf("%s Recursions done, processing slice updates.\n", context.c_str());

			// ParentInfo could be invalid after a wait and must be re-initialized.
			// All uses below occur before waits so no reinitialization is done.
			state ParentInfo* parentInfo = &self->childUpdateTracker[rootID.front()];

			// InternalPageModifier takes the results of the recursive commitSubtree() calls in order
			// and makes changes to page as needed, copying as needed, and generating an array from
			// which to build new page(s) if modification is not possible or not allowed.
			// If pageCopy is already set it was initialized to page above so the modifier doesn't need
			// to copy it
			state InternalPageModifier modifier(page, pageCopy.isValid(), tryToUpdate, parentInfo);

			// Apply the possible changes for each subtree range recursed to, except the last one.
			// For each range, the expected next record, if any, is checked against the first boundary
			// of the next range, if any.
			for (int i = 0, iEnd = slices.size() - 1; i < iEnd; ++i) {
				modifier.applyUpdate(*slices[i], slices[i + 1]->getFirstBoundary());
			}

			// The expected next record for the final range is checked against one of the upper boundaries passed to
			// this commitSubtree() instance.  If changes have already been made, then the subtree upper boundary is
			// passed, so in the event a different upper boundary is needed it will be added to the already-modified
			// page.  Otherwise, the decode boundary is used which will prevent this page from being modified for the
			// sole purpose of adding a dummy upper bound record.
			debug_printf("%s Applying final child range update. changesMade=%d  Parent update is: %s\n",
			             context.c_str(),
			             modifier.changesMade,
			             update->toString().c_str());
			modifier.applyUpdate(*slices.back(),
			                     modifier.changesMade ? &update->subtreeUpperBound : &update->decodeUpperBound);

			state bool detachChildren = (parentInfo->count > 2);
			state bool forceUpdate = false;

			// If no changes were made, but we should rewrite it to point directly to remapped child pages
			if (!modifier.changesMade && detachChildren) {
				debug_printf(
				    "%s Internal page forced rewrite because at least %d children have been updated in-place.\n",
				    context.c_str(),
				    parentInfo->count);

				forceUpdate = true;
				modifier.updating = true;

				// Make sure the modifier cloned the page so we can update the child links in-place below.
				modifier.cloneForUpdate();
				++g_redwoodMetrics.level(height).metrics.forceUpdate;
			}

			// If the modifier cloned the page for updating, then update our local pageCopy, btPage, and cursor
			if (modifier.clonedPage) {
				pageCopy = modifier.page;
				btPage = modifier.btPage();
				cursor.switchTree(btPage->tree());
			}

			// If page contents have changed
			if (modifier.changesMade || forceUpdate) {
				if (modifier.empty()) {
					update->cleared();
					debug_printf("%s All internal page children were deleted so deleting this page too, returning %s\n",
					             context.c_str(),
					             toString(*update).c_str());
					self->freeBTreePage(rootID, batch->writeVersion);
					self->childUpdateTracker.erase(rootID.front());
				} else {
					if (modifier.updating) {
						// Page was updated in place (or being forced to be updated in place to update child page ids)
						debug_printf(
						    "%s Internal page modified in-place tryToUpdate=%d forceUpdate=%d detachChildren=%d\n",
						    context.c_str(),
						    tryToUpdate,
						    forceUpdate,
						    detachChildren);

						if (detachChildren) {
							int detached = 0;
							cursor.moveFirst();
							auto& stats = g_redwoodMetrics.level(height);
							while (cursor.valid()) {
								if (cursor.get().value.present()) {
									for (auto& p : cursor.get().getChildPage()) {
										if (parentInfo->maybeUpdated(p)) {
											LogicalPageID newID =
											    self->m_pager->detachRemappedPage(p, batch->writeVersion);
											if (newID != invalidLogicalPageID) {
												debug_printf("%s Detach updated %u -> %u\n", context.c_str(), p, newID);
												p = newID;
												++stats.metrics.detachChild;
												++detached;
											}
										}
									}
								}
								cursor.moveNext();
							}
							parentInfo->clear();
							if (forceUpdate && detached == 0) {
								debug_printf("%s No children detached during forced update, returning %s\n",
								             context.c_str(),
								             toString(*update).c_str());
								return Void();
							}
						}

						BTreePageIDRef newID = wait(self->updateBTreePage(self,
						                                                  rootID,
						                                                  &update->newLinks.arena(),
						                                                  pageCopy.castTo<ArenaPage>(),
						                                                  batch->writeVersion));
						debug_printf(
						    "%s commitSubtree(): Internal page updated in-place at version %s, new contents: %s\n",
						    context.c_str(),
						    toString(batch->writeVersion).c_str(),
						    btPage
						        ->toString(false,
						                   newID,
						                   batch->snapshot->getVersion(),
						                   update->decodeLowerBound,
						                   update->decodeUpperBound)
						        .c_str());

						update->updatedInPlace(newID, btPage, newID.size() * self->m_blockSize);
						debug_printf("%s Internal page updated in-place, returning %s\n",
						             context.c_str(),
						             toString(*update).c_str());
					} else {
						// Page was rebuilt, possibly split.
						debug_printf("%s Internal page could not be modified, rebuilding replacement(s).\n",
						             context.c_str());

						if (detachChildren) {
							auto& stats = g_redwoodMetrics.level(height);
							for (auto& rec : modifier.rebuild) {
								if (rec.value.present()) {
									BTreePageIDRef oldPages = rec.getChildPage();
									BTreePageIDRef newPages;
									for (int i = 0; i < oldPages.size(); ++i) {
										LogicalPageID p = oldPages[i];
										if (parentInfo->maybeUpdated(p)) {
											LogicalPageID newID =
											    self->m_pager->detachRemappedPage(p, batch->writeVersion);
											if (newID != invalidLogicalPageID) {
												// Rebuild record values reference original page memory so make a copy
												if (newPages.empty()) {
													newPages = BTreePageIDRef(modifier.rebuild.arena(), oldPages);
													rec.setChildPage(newPages);
												}
												debug_printf("%s Detach updated %u -> %u\n", context.c_str(), p, newID);
												newPages[i] = newID;
												++stats.metrics.detachChild;
											}
										}
									}
								}
							}
							parentInfo->clear();
						}

						Standalone<VectorRef<RedwoodRecordRef>> newChildEntries =
						    wait(writePages(self,
						                    &update->subtreeLowerBound,
						                    &update->subtreeUpperBound,
						                    modifier.rebuild,
						                    height,
						                    batch->writeVersion,
						                    rootID));
						update->rebuilt(newChildEntries);

						debug_printf(
						    "%s Internal page rebuilt, returning %s\n", context.c_str(), toString(*update).c_str());
					}
				}
			} else {
				debug_printf("%s Page has no changes, returning %s\n", context.c_str(), toString(*update).c_str());
			}
			return Void();
		}
	}