ACTOR Future blobGranuleUpdateFiles()

in fdbserver/BlobWorker.actor.cpp [1056:1607]


ACTOR Future<Void> blobGranuleUpdateFiles(Reference<BlobWorkerData> bwData,
                                          Reference<GranuleMetadata> metadata,
                                          Future<GranuleStartState> assignFuture) {
	state Reference<ChangeFeedData> oldChangeFeedStream = makeReference<ChangeFeedData>();
	state Reference<ChangeFeedData> changeFeedStream = makeReference<ChangeFeedData>();
	state Future<BlobFileIndex> inFlightBlobSnapshot;
	state std::deque<InFlightDeltaFile> inFlightDeltaFiles;
	state Future<Void> oldChangeFeedFuture;
	state Future<Void> changeFeedFuture;
	state GranuleStartState startState;
	state bool readOldChangeFeed;
	state bool lastFromOldChangeFeed = false;
	state Optional<std::pair<KeyRange, UID>> oldChangeFeedDataComplete;
	state Key cfKey;
	state Optional<Key> oldCFKey;

	state std::deque<std::pair<Version, Version>> rollbacksInProgress;
	state std::deque<std::pair<Version, Version>> rollbacksCompleted;

	state bool snapshotEligible; // just wrote a delta file or just took granule over from another worker
	state bool justDidRollback = false;

	try {
		// set resume snapshot so it's not valid until we pause to ask the blob manager for a re-snapshot
		metadata->resumeSnapshot.send(Void());

		// before starting, make sure worker persists range assignment and acquires the granule lock
		GranuleStartState _info = wait(assignFuture);
		startState = _info;

		wait(delay(0, TaskPriority::BlobWorkerUpdateStorage));

		cfKey = StringRef(startState.granuleID.toString());
		if (startState.parentGranule.present()) {
			oldCFKey = StringRef(startState.parentGranule.get().second.toString());
		}

		if (BW_DEBUG) {
			fmt::print("Granule File Updater Starting for [{0} - {1}):\n",
			           metadata->keyRange.begin.printable(),
			           metadata->keyRange.end.printable());
			fmt::print("  CFID: {}\n", startState.granuleID.toString());
			fmt::print("  CF Start Version: {}\n", startState.changeFeedStartVersion);
			fmt::print("  Previous Durable Version: {}\n", startState.previousDurableVersion);
			fmt::print("  doSnapshot={}\n", startState.doSnapshot ? "T" : "F");
			fmt::print("  Prev CFID: {}\n",
			           startState.parentGranule.present() ? startState.parentGranule.get().second.toString().c_str()
			                                              : "");
			fmt::print("  blobFilesToSnapshot={}\n", startState.blobFilesToSnapshot.present() ? "T" : "F");
		}

		state Version startVersion;
		state BlobFileIndex newSnapshotFile;

		inFlightBlobSnapshot = Future<BlobFileIndex>(); // not valid!

		// if this is a reassign, calculate how close to a snapshot the previous owner was
		if (startState.existingFiles.present()) {
			GranuleFiles files = startState.existingFiles.get();
			if (!files.snapshotFiles.empty() && !files.deltaFiles.empty()) {
				Version snapshotVersion = files.snapshotFiles.back().version;
				for (int i = files.deltaFiles.size() - 1; i >= 0; i--) {
					if (files.deltaFiles[i].version > snapshotVersion) {
						metadata->bytesInNewDeltaFiles += files.deltaFiles[i].length;
					}
				}
			}
			metadata->files = startState.existingFiles.get();
			snapshotEligible = true;
		}

		if (!startState.doSnapshot) {
			startVersion = startState.previousDurableVersion;
			ASSERT(!metadata->files.snapshotFiles.empty());
			metadata->pendingSnapshotVersion = metadata->files.snapshotFiles.back().version;
			metadata->durableSnapshotVersion.set(metadata->pendingSnapshotVersion);
		} else {
			if (startState.blobFilesToSnapshot.present()) {
				inFlightBlobSnapshot =
				    compactFromBlob(bwData, metadata, startState.granuleID, startState.blobFilesToSnapshot.get());
				startVersion = startState.previousDurableVersion;
				metadata->durableSnapshotVersion.set(startState.blobFilesToSnapshot.get().snapshotFiles.back().version);
			} else {
				ASSERT(startState.previousDurableVersion == invalidVersion);
				BlobFileIndex fromFDB = wait(dumpInitialSnapshotFromFDB(bwData, metadata, startState.granuleID));
				newSnapshotFile = fromFDB;
				ASSERT(startState.changeFeedStartVersion <= fromFDB.version);
				startVersion = newSnapshotFile.version;
				metadata->files.snapshotFiles.push_back(newSnapshotFile);
				metadata->durableSnapshotVersion.set(startVersion);

				// construct fake history entry so we can store start version for splitting later
				startState.history =
				    GranuleHistory(metadata->keyRange, startVersion, Standalone<BlobGranuleHistoryValue>());

				wait(yield(TaskPriority::BlobWorkerUpdateStorage));
			}
			metadata->pendingSnapshotVersion = startVersion;
		}

		metadata->durableDeltaVersion.set(startVersion);
		metadata->pendingDeltaVersion = startVersion;
		metadata->bufferedDeltaVersion.set(startVersion);

		ASSERT(metadata->readable.canBeSet());
		metadata->readable.send(Void());

		if (startState.parentGranule.present()) {
			// FIXME: once we have empty versions, only include up to startState.changeFeedStartVersion in the read
			// stream. Then we can just stop the old stream when we get end_of_stream from this and not handle the
			// mutation version truncation stuff

			// FIXME: filtering on key range != change feed range doesn't work
			readOldChangeFeed = true;
			oldChangeFeedFuture =
			    bwData->db->getChangeFeedStream(oldChangeFeedStream,
			                                    oldCFKey.get(),
			                                    startVersion + 1,
			                                    MAX_VERSION,
			                                    startState.parentGranule.get().first /*metadata->keyRange*/);
		} else {
			readOldChangeFeed = false;
			changeFeedFuture = bwData->db->getChangeFeedStream(
			    changeFeedStream, cfKey, startVersion + 1, MAX_VERSION, metadata->keyRange);
		}

		state Version lastVersion = startVersion + 1;
		loop {
			// check outstanding snapshot/delta files for completion
			if (inFlightBlobSnapshot.isValid() && inFlightBlobSnapshot.isReady()) {
				BlobFileIndex completedSnapshot = wait(inFlightBlobSnapshot);
				metadata->files.snapshotFiles.push_back(completedSnapshot);
				metadata->durableSnapshotVersion.set(completedSnapshot.version);
				inFlightBlobSnapshot = Future<BlobFileIndex>(); // not valid!
				if (BW_DEBUG) {
					printf("Async Blob Snapshot completed for [%s - %s)\n",
					       metadata->keyRange.begin.printable().c_str(),
					       metadata->keyRange.end.printable().c_str());
				}

				wait(yield(TaskPriority::BlobWorkerUpdateStorage));
			}
			if (!inFlightBlobSnapshot.isValid()) {
				while (inFlightDeltaFiles.size() > 0) {
					if (inFlightDeltaFiles.front().future.isReady()) {
						BlobFileIndex completedDeltaFile = wait(inFlightDeltaFiles.front().future);
						wait(handleCompletedDeltaFile(bwData,
						                              metadata,
						                              completedDeltaFile,
						                              cfKey,
						                              startState.changeFeedStartVersion,
						                              rollbacksCompleted));

						inFlightDeltaFiles.pop_front();
						wait(yield(TaskPriority::BlobWorkerUpdateStorage));
					} else {
						break;
					}
				}
			}

			// inject delay into reading change feed stream
			if (BUGGIFY_WITH_PROB(0.001)) {
				wait(delay(deterministicRandom()->random01(), TaskPriority::BlobWorkerReadChangeFeed));
			} else {
				wait(delay(0, TaskPriority::BlobWorkerReadChangeFeed));
			}

			state Standalone<VectorRef<MutationsAndVersionRef>> mutations;
			if (readOldChangeFeed) {
				Standalone<VectorRef<MutationsAndVersionRef>> oldMutations =
				    waitNext(oldChangeFeedStream->mutations.getFuture());
				// TODO filter old mutations won't be necessary, SS does it already
				if (filterOldMutations(
				        metadata->keyRange, &oldMutations, &mutations, startState.changeFeedStartVersion)) {
					// if old change feed has caught up with where new one would start, finish last one and start new
					// one

					Key cfKey = StringRef(startState.granuleID.toString());
					changeFeedFuture = bwData->db->getChangeFeedStream(
					    changeFeedStream, cfKey, startState.changeFeedStartVersion, MAX_VERSION, metadata->keyRange);
					oldChangeFeedFuture.cancel();
					lastFromOldChangeFeed = true;

					// now that old change feed is cancelled, clear out any mutations still in buffer by replacing
					// promise stream
					oldChangeFeedStream = makeReference<ChangeFeedData>();
				}
			} else {
				Standalone<VectorRef<MutationsAndVersionRef>> newMutations =
				    waitNext(changeFeedStream->mutations.getFuture());
				mutations = newMutations;
			}

			// process mutations
			for (MutationsAndVersionRef d : mutations) {
				state MutationsAndVersionRef deltas = d;
				ASSERT(deltas.version >= lastVersion);
				ASSERT(lastVersion > metadata->bufferedDeltaVersion.get());

				// if lastVersion is complete, update buffered version and potentially write a delta file with
				// everything up to lastVersion
				if (deltas.version > lastVersion) {
					metadata->bufferedDeltaVersion.set(lastVersion);
				}
				// Write a new delta file IF we have enough bytes, and we have all of the previous version's stuff
				// there to ensure no versions span multiple delta files. Check this by ensuring the version of this
				// new delta is larger than the previous largest seen version
				if (metadata->bufferedDeltaBytes >= SERVER_KNOBS->BG_DELTA_FILE_TARGET_BYTES &&
				    deltas.version > lastVersion) {
					if (BW_DEBUG) {
						fmt::print("Granule [{0} - {1}) flushing delta file after {2} bytes @ {3} {4}{5}\n",
						           metadata->keyRange.begin.printable(),
						           metadata->keyRange.end.printable(),
						           metadata->bufferedDeltaBytes,
						           lastVersion,
						           deltas.version,
						           oldChangeFeedDataComplete.present() ? ". Finalizing " : "");
					}
					TraceEvent("BlobGranuleDeltaFile", bwData->id)
					    .detail("Granule", metadata->keyRange)
					    .detail("Version", lastVersion);

					// sanity check for version order
					ASSERT(lastVersion >= metadata->currentDeltas.back().version);
					ASSERT(metadata->pendingDeltaVersion < metadata->currentDeltas.front().version);

					// launch pipelined, but wait for previous operation to complete before persisting to FDB
					Future<BlobFileIndex> previousDeltaFileFuture;
					if (inFlightBlobSnapshot.isValid() && inFlightDeltaFiles.empty()) {
						previousDeltaFileFuture = inFlightBlobSnapshot;
					} else if (!inFlightDeltaFiles.empty()) {
						previousDeltaFileFuture = inFlightDeltaFiles.back().future;
					} else {
						previousDeltaFileFuture = Future<BlobFileIndex>(BlobFileIndex());
					}
					Future<BlobFileIndex> dfFuture = writeDeltaFile(bwData,
					                                                metadata->keyRange,
					                                                startState.granuleID,
					                                                metadata->originalEpoch,
					                                                metadata->originalSeqno,
					                                                metadata->deltaArena,
					                                                metadata->currentDeltas,
					                                                lastVersion,
					                                                previousDeltaFileFuture,
					                                                oldChangeFeedDataComplete);
					inFlightDeltaFiles.push_back(
					    InFlightDeltaFile(dfFuture, lastVersion, metadata->bufferedDeltaBytes));

					oldChangeFeedDataComplete.reset();
					// add new pending delta file
					ASSERT(metadata->pendingDeltaVersion < lastVersion);
					metadata->pendingDeltaVersion = lastVersion;
					metadata->bytesInNewDeltaFiles += metadata->bufferedDeltaBytes;

					bwData->stats.mutationBytesBuffered -= metadata->bufferedDeltaBytes;

					// reset current deltas
					metadata->deltaArena = Arena();
					metadata->currentDeltas = GranuleDeltas();
					metadata->bufferedDeltaBytes = 0;

					// if we just wrote a delta file, check if we need to compact here.
					// exhaust old change feed before compacting - otherwise we could end up with an endlessly
					// growing list of previous change feeds in the worst case.
					snapshotEligible = true;
				}

				// FIXME: if we're still reading from old change feed, we should probably compact if we're making a
				// bunch of extra delta files at some point, even if we don't consider it for a split yet
				if (snapshotEligible && metadata->bytesInNewDeltaFiles >= SERVER_KNOBS->BG_DELTA_BYTES_BEFORE_COMPACT &&
				    !readOldChangeFeed) {
					if (BW_DEBUG && (inFlightBlobSnapshot.isValid() || !inFlightDeltaFiles.empty())) {
						fmt::print(
						    "Granule [{0} - {1}) ready to re-snapshot, waiting for outstanding {2} snapshot and {3} "
						    "deltas to "
						    "finish\n",
						    metadata->keyRange.begin.printable(),
						    metadata->keyRange.end.printable(),
						    inFlightBlobSnapshot.isValid() ? 1 : 0,
						    inFlightDeltaFiles.size());
					}
					// wait for all in flight snapshot/delta files
					if (inFlightBlobSnapshot.isValid()) {
						BlobFileIndex completedSnapshot = wait(inFlightBlobSnapshot);
						metadata->files.snapshotFiles.push_back(completedSnapshot);
						metadata->durableSnapshotVersion.set(completedSnapshot.version);
						inFlightBlobSnapshot = Future<BlobFileIndex>(); // not valid!
						wait(yield(TaskPriority::BlobWorkerUpdateStorage));
					}
					for (auto& it : inFlightDeltaFiles) {
						BlobFileIndex completedDeltaFile = wait(it.future);
						wait(handleCompletedDeltaFile(bwData,
						                              metadata,
						                              completedDeltaFile,
						                              cfKey,
						                              startState.changeFeedStartVersion,
						                              rollbacksCompleted));
						wait(yield(TaskPriority::BlobWorkerUpdateStorage));
					}
					inFlightDeltaFiles.clear();

					if (BW_DEBUG) {
						fmt::print("Granule [{0} - {1}) checking with BM for re-snapshot after {2} bytes\n",
						           metadata->keyRange.begin.printable(),
						           metadata->keyRange.end.printable(),
						           metadata->bytesInNewDeltaFiles);
					}

					TraceEvent("BlobGranuleSnapshotCheck", bwData->id)
					    .detail("Granule", metadata->keyRange)
					    .detail("Version", metadata->durableDeltaVersion.get());

					// Save these from the start so repeated requests are idempotent
					// Need to retry in case response is dropped or manager changes. Eventually, a manager will
					// either reassign the range with continue=true, or will revoke the range. But, we will keep the
					// range open at this version for reads until that assignment change happens
					metadata->resumeSnapshot.reset();
					state int64_t statusEpoch = metadata->continueEpoch;
					state int64_t statusSeqno = metadata->continueSeqno;
					loop {
						loop {
							try {
								wait(bwData->currentManagerStatusStream.get().onReady());
								bwData->currentManagerStatusStream.get().send(
								    GranuleStatusReply(metadata->keyRange,
								                       true,
								                       statusEpoch,
								                       statusSeqno,
								                       startState.granuleID,
								                       startState.history.get().version,
								                       metadata->durableDeltaVersion.get()));
								break;
							} catch (Error& e) {
								wait(bwData->currentManagerStatusStream.onChange());
							}
						}

						choose {
							when(wait(metadata->resumeSnapshot.getFuture())) { break; }
							when(wait(delay(1.0))) {}
							when(wait(bwData->currentManagerStatusStream.onChange())) {}
						}

						if (BW_DEBUG) {
							fmt::print(
							    "Granule [{0} - {1})\n, hasn't heard back from BM in BW {2}, re-sending status\n",
							    metadata->keyRange.begin.printable(),
							    metadata->keyRange.end.printable(),
							    bwData->id.toString());
						}
					}

					if (BW_DEBUG) {
						fmt::print("Granule [{0} - {1}) re-snapshotting after {2} bytes\n",
						           metadata->keyRange.begin.printable(),
						           metadata->keyRange.end.printable(),
						           metadata->bytesInNewDeltaFiles);
					}
					TraceEvent("BlobGranuleSnapshotFile", bwData->id)
					    .detail("Granule", metadata->keyRange)
					    .detail("Version", metadata->durableDeltaVersion.get());
					// TODO: this could read from FDB instead if it knew there was a large range clear at the end or
					// it knew the granule was small, or something

					// Have to copy files object so that adding to it as we start writing new delta files in
					// parallel doesn't conflict. We could also pass the snapshot version and ignore any snapshot
					// files >= version and any delta files > version, but that's more complicated
					inFlightBlobSnapshot = compactFromBlob(bwData, metadata, startState.granuleID, metadata->files);
					metadata->pendingSnapshotVersion = metadata->durableDeltaVersion.get();

					// reset metadata
					metadata->bytesInNewDeltaFiles = 0;
				} else if (snapshotEligible &&
				           metadata->bytesInNewDeltaFiles >= SERVER_KNOBS->BG_DELTA_BYTES_BEFORE_COMPACT) {
					// if we're in the old change feed case and can't snapshot but we have enough data to, don't
					// queue too many delta files in parallel
					while (inFlightDeltaFiles.size() > 10) {
						if (BW_DEBUG) {
							printf("[%s - %s) Waiting on delta file b/c old change feed\n",
							       metadata->keyRange.begin.printable().c_str(),
							       metadata->keyRange.end.printable().c_str());
						}
						BlobFileIndex completedDeltaFile = wait(inFlightDeltaFiles.front().future);
						if (BW_DEBUG) {
							printf("  [%s - %s) Got completed delta file\n",
							       metadata->keyRange.begin.printable().c_str(),
							       metadata->keyRange.end.printable().c_str());
						}
						wait(handleCompletedDeltaFile(bwData,
						                              metadata,
						                              completedDeltaFile,
						                              cfKey,
						                              startState.changeFeedStartVersion,
						                              rollbacksCompleted));
						wait(yield(TaskPriority::BlobWorkerUpdateStorage));
						inFlightDeltaFiles.pop_front();
					}
				}
				snapshotEligible = false;

				wait(yield(TaskPriority::BlobWorkerReadChangeFeed));

				// finally, after we optionally write delta and snapshot files, add new mutations to buffer
				if (!deltas.mutations.empty()) {
					if (deltas.mutations.size() == 1 && deltas.mutations.back().param1 == lastEpochEndPrivateKey) {
						// Note rollbackVerision is durable, [rollbackVersion+1 - deltas.version] needs to be tossed
						// For correctness right now, there can be no waits and yields either in rollback handling
						// or in handleBlobGranuleFileRequest once waitForVersion has succeeded, otherwise this will
						// race and clobber results
						Version rollbackVersion;
						BinaryReader br(deltas.mutations[0].param2, Unversioned());
						br >> rollbackVersion;

						// FIXME: THIS IS FALSE!! delta can commit by getting committed version out of band, without
						// seeing rollback mutation.
						ASSERT(rollbackVersion >= metadata->durableDeltaVersion.get());

						if (!rollbacksInProgress.empty()) {
							ASSERT(rollbacksInProgress.front().first == rollbackVersion);
							ASSERT(rollbacksInProgress.front().second == deltas.version);
							fmt::print("Passed rollback {0} -> {1}\n", deltas.version, rollbackVersion);
							rollbacksCompleted.push_back(rollbacksInProgress.front());
							rollbacksInProgress.pop_front();
						} else {
							// FIXME: add counter for granule rollbacks and rollbacks skipped?
							// explicitly check last delta in currentDeltas because lastVersion and bufferedDeltaVersion
							// include empties
							if (metadata->pendingDeltaVersion <= rollbackVersion &&
							    (metadata->currentDeltas.empty() ||
							     metadata->currentDeltas.back().version <= rollbackVersion)) {

								if (BW_DEBUG) {
									fmt::print("BW skipping rollback {0} -> {1} completely\n",
									           deltas.version,
									           rollbackVersion);
								}
							} else {
								if (BW_DEBUG) {
									fmt::print("BW [{0} - {1}) ROLLBACK @ {2} -> {3}\n",
									           metadata->keyRange.begin.printable(),
									           metadata->keyRange.end.printable(),
									           deltas.version,
									           rollbackVersion);
									TraceEvent(SevWarn, "GranuleRollback", bwData->id)
									    .detail("Granule", metadata->keyRange)
									    .detail("Version", deltas.version)
									    .detail("RollbackVersion", rollbackVersion);
								}
								Version cfRollbackVersion = doGranuleRollback(metadata,
								                                              deltas.version,
								                                              rollbackVersion,
								                                              inFlightDeltaFiles,
								                                              rollbacksInProgress,
								                                              rollbacksCompleted);

								// reset change feeds to cfRollbackVersion
								if (readOldChangeFeed) {
									oldChangeFeedStream = makeReference<ChangeFeedData>();
									oldChangeFeedFuture = bwData->db->getChangeFeedStream(
									    oldChangeFeedStream,
									    oldCFKey.get(),
									    cfRollbackVersion + 1,
									    MAX_VERSION,
									    startState.parentGranule.get().first /*metadata->keyRange*/);
								} else {
									changeFeedStream = makeReference<ChangeFeedData>();
									changeFeedFuture = bwData->db->getChangeFeedStream(changeFeedStream,
									                                                   cfKey,
									                                                   cfRollbackVersion + 1,
									                                                   MAX_VERSION,
									                                                   metadata->keyRange);
								}
								justDidRollback = true;
								break;
							}
						}
					} else if (!rollbacksInProgress.empty() && rollbacksInProgress.front().first < deltas.version &&
					           rollbacksInProgress.front().second > deltas.version) {
						if (BW_DEBUG) {
							fmt::print("Skipping mutations @ {} b/c prior rollback\n", deltas.version);
						}
					} else {
						for (auto& delta : deltas.mutations) {
							metadata->bufferedDeltaBytes += delta.totalSize();
							bwData->stats.changeFeedInputBytes += delta.totalSize();
							bwData->stats.mutationBytesBuffered += delta.totalSize();

							DEBUG_MUTATION("BlobWorkerBuffer", deltas.version, delta, bwData->id)
							    .detail("Granule", metadata->keyRange)
							    .detail("ChangeFeedID", readOldChangeFeed ? oldCFKey.get() : cfKey)
							    .detail("OldChangeFeed", readOldChangeFeed ? "T" : "F");
						}
						metadata->currentDeltas.push_back_deep(metadata->deltaArena, deltas);
					}
				}
				if (justDidRollback) {
					break;
				}
				lastVersion = deltas.version;
			}
			if (lastFromOldChangeFeed && !justDidRollback) {
				readOldChangeFeed = false;
				lastFromOldChangeFeed = false;
				// set this so next delta file write updates granule split metadata to done
				ASSERT(startState.parentGranule.present());
				oldChangeFeedDataComplete = startState.parentGranule.get();
				if (BW_DEBUG) {
					fmt::print("Granule [{0} - {1}) switching to new change feed {2} @ {3}\n",
					           metadata->keyRange.begin.printable(),
					           metadata->keyRange.end.printable(),
					           startState.granuleID.toString(),
					           metadata->bufferedDeltaVersion.get());
				}
			}
			justDidRollback = false;
		}
	} catch (Error& e) {
		if (e.code() == error_code_operation_cancelled) {
			throw;
		}

		if (metadata->cancelled.canBeSet()) {
			metadata->cancelled.send(Void());
		}

		if (e.code() == error_code_granule_assignment_conflict) {
			TraceEvent(SevInfo, "GranuleAssignmentConflict", bwData->id).detail("Granule", metadata->keyRange);
		} else {
			++bwData->stats.granuleUpdateErrors;
			if (BW_DEBUG) {
				printf("Granule file updater for [%s - %s) got error %s, exiting\n",
				       metadata->keyRange.begin.printable().c_str(),
				       metadata->keyRange.end.printable().c_str(),
				       e.name());
			}
			TraceEvent(SevWarn, "GranuleFileUpdaterError", bwData->id).detail("Granule", metadata->keyRange).error(e);

			if (granuleCanRetry(e)) {
				// explicitly cancel all outstanding write futures BEFORE updating promise stream, to ensure they
				// can't update files after the re-assigned granule acquires the lock
				inFlightBlobSnapshot.cancel();
				for (auto& f : inFlightDeltaFiles) {
					f.future.cancel();
				}

				bwData->granuleUpdateErrors.send(metadata->originalReq);
			}
		}
		throw e;
	}
}