in fdbserver/BlobWorker.actor.cpp [1056:1607]
ACTOR Future<Void> blobGranuleUpdateFiles(Reference<BlobWorkerData> bwData,
Reference<GranuleMetadata> metadata,
Future<GranuleStartState> assignFuture) {
state Reference<ChangeFeedData> oldChangeFeedStream = makeReference<ChangeFeedData>();
state Reference<ChangeFeedData> changeFeedStream = makeReference<ChangeFeedData>();
state Future<BlobFileIndex> inFlightBlobSnapshot;
state std::deque<InFlightDeltaFile> inFlightDeltaFiles;
state Future<Void> oldChangeFeedFuture;
state Future<Void> changeFeedFuture;
state GranuleStartState startState;
state bool readOldChangeFeed;
state bool lastFromOldChangeFeed = false;
state Optional<std::pair<KeyRange, UID>> oldChangeFeedDataComplete;
state Key cfKey;
state Optional<Key> oldCFKey;
state std::deque<std::pair<Version, Version>> rollbacksInProgress;
state std::deque<std::pair<Version, Version>> rollbacksCompleted;
state bool snapshotEligible; // just wrote a delta file or just took granule over from another worker
state bool justDidRollback = false;
try {
// set resume snapshot so it's not valid until we pause to ask the blob manager for a re-snapshot
metadata->resumeSnapshot.send(Void());
// before starting, make sure worker persists range assignment and acquires the granule lock
GranuleStartState _info = wait(assignFuture);
startState = _info;
wait(delay(0, TaskPriority::BlobWorkerUpdateStorage));
cfKey = StringRef(startState.granuleID.toString());
if (startState.parentGranule.present()) {
oldCFKey = StringRef(startState.parentGranule.get().second.toString());
}
if (BW_DEBUG) {
fmt::print("Granule File Updater Starting for [{0} - {1}):\n",
metadata->keyRange.begin.printable(),
metadata->keyRange.end.printable());
fmt::print(" CFID: {}\n", startState.granuleID.toString());
fmt::print(" CF Start Version: {}\n", startState.changeFeedStartVersion);
fmt::print(" Previous Durable Version: {}\n", startState.previousDurableVersion);
fmt::print(" doSnapshot={}\n", startState.doSnapshot ? "T" : "F");
fmt::print(" Prev CFID: {}\n",
startState.parentGranule.present() ? startState.parentGranule.get().second.toString().c_str()
: "");
fmt::print(" blobFilesToSnapshot={}\n", startState.blobFilesToSnapshot.present() ? "T" : "F");
}
state Version startVersion;
state BlobFileIndex newSnapshotFile;
inFlightBlobSnapshot = Future<BlobFileIndex>(); // not valid!
// if this is a reassign, calculate how close to a snapshot the previous owner was
if (startState.existingFiles.present()) {
GranuleFiles files = startState.existingFiles.get();
if (!files.snapshotFiles.empty() && !files.deltaFiles.empty()) {
Version snapshotVersion = files.snapshotFiles.back().version;
for (int i = files.deltaFiles.size() - 1; i >= 0; i--) {
if (files.deltaFiles[i].version > snapshotVersion) {
metadata->bytesInNewDeltaFiles += files.deltaFiles[i].length;
}
}
}
metadata->files = startState.existingFiles.get();
snapshotEligible = true;
}
if (!startState.doSnapshot) {
startVersion = startState.previousDurableVersion;
ASSERT(!metadata->files.snapshotFiles.empty());
metadata->pendingSnapshotVersion = metadata->files.snapshotFiles.back().version;
metadata->durableSnapshotVersion.set(metadata->pendingSnapshotVersion);
} else {
if (startState.blobFilesToSnapshot.present()) {
inFlightBlobSnapshot =
compactFromBlob(bwData, metadata, startState.granuleID, startState.blobFilesToSnapshot.get());
startVersion = startState.previousDurableVersion;
metadata->durableSnapshotVersion.set(startState.blobFilesToSnapshot.get().snapshotFiles.back().version);
} else {
ASSERT(startState.previousDurableVersion == invalidVersion);
BlobFileIndex fromFDB = wait(dumpInitialSnapshotFromFDB(bwData, metadata, startState.granuleID));
newSnapshotFile = fromFDB;
ASSERT(startState.changeFeedStartVersion <= fromFDB.version);
startVersion = newSnapshotFile.version;
metadata->files.snapshotFiles.push_back(newSnapshotFile);
metadata->durableSnapshotVersion.set(startVersion);
// construct fake history entry so we can store start version for splitting later
startState.history =
GranuleHistory(metadata->keyRange, startVersion, Standalone<BlobGranuleHistoryValue>());
wait(yield(TaskPriority::BlobWorkerUpdateStorage));
}
metadata->pendingSnapshotVersion = startVersion;
}
metadata->durableDeltaVersion.set(startVersion);
metadata->pendingDeltaVersion = startVersion;
metadata->bufferedDeltaVersion.set(startVersion);
ASSERT(metadata->readable.canBeSet());
metadata->readable.send(Void());
if (startState.parentGranule.present()) {
// FIXME: once we have empty versions, only include up to startState.changeFeedStartVersion in the read
// stream. Then we can just stop the old stream when we get end_of_stream from this and not handle the
// mutation version truncation stuff
// FIXME: filtering on key range != change feed range doesn't work
readOldChangeFeed = true;
oldChangeFeedFuture =
bwData->db->getChangeFeedStream(oldChangeFeedStream,
oldCFKey.get(),
startVersion + 1,
MAX_VERSION,
startState.parentGranule.get().first /*metadata->keyRange*/);
} else {
readOldChangeFeed = false;
changeFeedFuture = bwData->db->getChangeFeedStream(
changeFeedStream, cfKey, startVersion + 1, MAX_VERSION, metadata->keyRange);
}
state Version lastVersion = startVersion + 1;
loop {
// check outstanding snapshot/delta files for completion
if (inFlightBlobSnapshot.isValid() && inFlightBlobSnapshot.isReady()) {
BlobFileIndex completedSnapshot = wait(inFlightBlobSnapshot);
metadata->files.snapshotFiles.push_back(completedSnapshot);
metadata->durableSnapshotVersion.set(completedSnapshot.version);
inFlightBlobSnapshot = Future<BlobFileIndex>(); // not valid!
if (BW_DEBUG) {
printf("Async Blob Snapshot completed for [%s - %s)\n",
metadata->keyRange.begin.printable().c_str(),
metadata->keyRange.end.printable().c_str());
}
wait(yield(TaskPriority::BlobWorkerUpdateStorage));
}
if (!inFlightBlobSnapshot.isValid()) {
while (inFlightDeltaFiles.size() > 0) {
if (inFlightDeltaFiles.front().future.isReady()) {
BlobFileIndex completedDeltaFile = wait(inFlightDeltaFiles.front().future);
wait(handleCompletedDeltaFile(bwData,
metadata,
completedDeltaFile,
cfKey,
startState.changeFeedStartVersion,
rollbacksCompleted));
inFlightDeltaFiles.pop_front();
wait(yield(TaskPriority::BlobWorkerUpdateStorage));
} else {
break;
}
}
}
// inject delay into reading change feed stream
if (BUGGIFY_WITH_PROB(0.001)) {
wait(delay(deterministicRandom()->random01(), TaskPriority::BlobWorkerReadChangeFeed));
} else {
wait(delay(0, TaskPriority::BlobWorkerReadChangeFeed));
}
state Standalone<VectorRef<MutationsAndVersionRef>> mutations;
if (readOldChangeFeed) {
Standalone<VectorRef<MutationsAndVersionRef>> oldMutations =
waitNext(oldChangeFeedStream->mutations.getFuture());
// TODO filter old mutations won't be necessary, SS does it already
if (filterOldMutations(
metadata->keyRange, &oldMutations, &mutations, startState.changeFeedStartVersion)) {
// if old change feed has caught up with where new one would start, finish last one and start new
// one
Key cfKey = StringRef(startState.granuleID.toString());
changeFeedFuture = bwData->db->getChangeFeedStream(
changeFeedStream, cfKey, startState.changeFeedStartVersion, MAX_VERSION, metadata->keyRange);
oldChangeFeedFuture.cancel();
lastFromOldChangeFeed = true;
// now that old change feed is cancelled, clear out any mutations still in buffer by replacing
// promise stream
oldChangeFeedStream = makeReference<ChangeFeedData>();
}
} else {
Standalone<VectorRef<MutationsAndVersionRef>> newMutations =
waitNext(changeFeedStream->mutations.getFuture());
mutations = newMutations;
}
// process mutations
for (MutationsAndVersionRef d : mutations) {
state MutationsAndVersionRef deltas = d;
ASSERT(deltas.version >= lastVersion);
ASSERT(lastVersion > metadata->bufferedDeltaVersion.get());
// if lastVersion is complete, update buffered version and potentially write a delta file with
// everything up to lastVersion
if (deltas.version > lastVersion) {
metadata->bufferedDeltaVersion.set(lastVersion);
}
// Write a new delta file IF we have enough bytes, and we have all of the previous version's stuff
// there to ensure no versions span multiple delta files. Check this by ensuring the version of this
// new delta is larger than the previous largest seen version
if (metadata->bufferedDeltaBytes >= SERVER_KNOBS->BG_DELTA_FILE_TARGET_BYTES &&
deltas.version > lastVersion) {
if (BW_DEBUG) {
fmt::print("Granule [{0} - {1}) flushing delta file after {2} bytes @ {3} {4}{5}\n",
metadata->keyRange.begin.printable(),
metadata->keyRange.end.printable(),
metadata->bufferedDeltaBytes,
lastVersion,
deltas.version,
oldChangeFeedDataComplete.present() ? ". Finalizing " : "");
}
TraceEvent("BlobGranuleDeltaFile", bwData->id)
.detail("Granule", metadata->keyRange)
.detail("Version", lastVersion);
// sanity check for version order
ASSERT(lastVersion >= metadata->currentDeltas.back().version);
ASSERT(metadata->pendingDeltaVersion < metadata->currentDeltas.front().version);
// launch pipelined, but wait for previous operation to complete before persisting to FDB
Future<BlobFileIndex> previousDeltaFileFuture;
if (inFlightBlobSnapshot.isValid() && inFlightDeltaFiles.empty()) {
previousDeltaFileFuture = inFlightBlobSnapshot;
} else if (!inFlightDeltaFiles.empty()) {
previousDeltaFileFuture = inFlightDeltaFiles.back().future;
} else {
previousDeltaFileFuture = Future<BlobFileIndex>(BlobFileIndex());
}
Future<BlobFileIndex> dfFuture = writeDeltaFile(bwData,
metadata->keyRange,
startState.granuleID,
metadata->originalEpoch,
metadata->originalSeqno,
metadata->deltaArena,
metadata->currentDeltas,
lastVersion,
previousDeltaFileFuture,
oldChangeFeedDataComplete);
inFlightDeltaFiles.push_back(
InFlightDeltaFile(dfFuture, lastVersion, metadata->bufferedDeltaBytes));
oldChangeFeedDataComplete.reset();
// add new pending delta file
ASSERT(metadata->pendingDeltaVersion < lastVersion);
metadata->pendingDeltaVersion = lastVersion;
metadata->bytesInNewDeltaFiles += metadata->bufferedDeltaBytes;
bwData->stats.mutationBytesBuffered -= metadata->bufferedDeltaBytes;
// reset current deltas
metadata->deltaArena = Arena();
metadata->currentDeltas = GranuleDeltas();
metadata->bufferedDeltaBytes = 0;
// if we just wrote a delta file, check if we need to compact here.
// exhaust old change feed before compacting - otherwise we could end up with an endlessly
// growing list of previous change feeds in the worst case.
snapshotEligible = true;
}
// FIXME: if we're still reading from old change feed, we should probably compact if we're making a
// bunch of extra delta files at some point, even if we don't consider it for a split yet
if (snapshotEligible && metadata->bytesInNewDeltaFiles >= SERVER_KNOBS->BG_DELTA_BYTES_BEFORE_COMPACT &&
!readOldChangeFeed) {
if (BW_DEBUG && (inFlightBlobSnapshot.isValid() || !inFlightDeltaFiles.empty())) {
fmt::print(
"Granule [{0} - {1}) ready to re-snapshot, waiting for outstanding {2} snapshot and {3} "
"deltas to "
"finish\n",
metadata->keyRange.begin.printable(),
metadata->keyRange.end.printable(),
inFlightBlobSnapshot.isValid() ? 1 : 0,
inFlightDeltaFiles.size());
}
// wait for all in flight snapshot/delta files
if (inFlightBlobSnapshot.isValid()) {
BlobFileIndex completedSnapshot = wait(inFlightBlobSnapshot);
metadata->files.snapshotFiles.push_back(completedSnapshot);
metadata->durableSnapshotVersion.set(completedSnapshot.version);
inFlightBlobSnapshot = Future<BlobFileIndex>(); // not valid!
wait(yield(TaskPriority::BlobWorkerUpdateStorage));
}
for (auto& it : inFlightDeltaFiles) {
BlobFileIndex completedDeltaFile = wait(it.future);
wait(handleCompletedDeltaFile(bwData,
metadata,
completedDeltaFile,
cfKey,
startState.changeFeedStartVersion,
rollbacksCompleted));
wait(yield(TaskPriority::BlobWorkerUpdateStorage));
}
inFlightDeltaFiles.clear();
if (BW_DEBUG) {
fmt::print("Granule [{0} - {1}) checking with BM for re-snapshot after {2} bytes\n",
metadata->keyRange.begin.printable(),
metadata->keyRange.end.printable(),
metadata->bytesInNewDeltaFiles);
}
TraceEvent("BlobGranuleSnapshotCheck", bwData->id)
.detail("Granule", metadata->keyRange)
.detail("Version", metadata->durableDeltaVersion.get());
// Save these from the start so repeated requests are idempotent
// Need to retry in case response is dropped or manager changes. Eventually, a manager will
// either reassign the range with continue=true, or will revoke the range. But, we will keep the
// range open at this version for reads until that assignment change happens
metadata->resumeSnapshot.reset();
state int64_t statusEpoch = metadata->continueEpoch;
state int64_t statusSeqno = metadata->continueSeqno;
loop {
loop {
try {
wait(bwData->currentManagerStatusStream.get().onReady());
bwData->currentManagerStatusStream.get().send(
GranuleStatusReply(metadata->keyRange,
true,
statusEpoch,
statusSeqno,
startState.granuleID,
startState.history.get().version,
metadata->durableDeltaVersion.get()));
break;
} catch (Error& e) {
wait(bwData->currentManagerStatusStream.onChange());
}
}
choose {
when(wait(metadata->resumeSnapshot.getFuture())) { break; }
when(wait(delay(1.0))) {}
when(wait(bwData->currentManagerStatusStream.onChange())) {}
}
if (BW_DEBUG) {
fmt::print(
"Granule [{0} - {1})\n, hasn't heard back from BM in BW {2}, re-sending status\n",
metadata->keyRange.begin.printable(),
metadata->keyRange.end.printable(),
bwData->id.toString());
}
}
if (BW_DEBUG) {
fmt::print("Granule [{0} - {1}) re-snapshotting after {2} bytes\n",
metadata->keyRange.begin.printable(),
metadata->keyRange.end.printable(),
metadata->bytesInNewDeltaFiles);
}
TraceEvent("BlobGranuleSnapshotFile", bwData->id)
.detail("Granule", metadata->keyRange)
.detail("Version", metadata->durableDeltaVersion.get());
// TODO: this could read from FDB instead if it knew there was a large range clear at the end or
// it knew the granule was small, or something
// Have to copy files object so that adding to it as we start writing new delta files in
// parallel doesn't conflict. We could also pass the snapshot version and ignore any snapshot
// files >= version and any delta files > version, but that's more complicated
inFlightBlobSnapshot = compactFromBlob(bwData, metadata, startState.granuleID, metadata->files);
metadata->pendingSnapshotVersion = metadata->durableDeltaVersion.get();
// reset metadata
metadata->bytesInNewDeltaFiles = 0;
} else if (snapshotEligible &&
metadata->bytesInNewDeltaFiles >= SERVER_KNOBS->BG_DELTA_BYTES_BEFORE_COMPACT) {
// if we're in the old change feed case and can't snapshot but we have enough data to, don't
// queue too many delta files in parallel
while (inFlightDeltaFiles.size() > 10) {
if (BW_DEBUG) {
printf("[%s - %s) Waiting on delta file b/c old change feed\n",
metadata->keyRange.begin.printable().c_str(),
metadata->keyRange.end.printable().c_str());
}
BlobFileIndex completedDeltaFile = wait(inFlightDeltaFiles.front().future);
if (BW_DEBUG) {
printf(" [%s - %s) Got completed delta file\n",
metadata->keyRange.begin.printable().c_str(),
metadata->keyRange.end.printable().c_str());
}
wait(handleCompletedDeltaFile(bwData,
metadata,
completedDeltaFile,
cfKey,
startState.changeFeedStartVersion,
rollbacksCompleted));
wait(yield(TaskPriority::BlobWorkerUpdateStorage));
inFlightDeltaFiles.pop_front();
}
}
snapshotEligible = false;
wait(yield(TaskPriority::BlobWorkerReadChangeFeed));
// finally, after we optionally write delta and snapshot files, add new mutations to buffer
if (!deltas.mutations.empty()) {
if (deltas.mutations.size() == 1 && deltas.mutations.back().param1 == lastEpochEndPrivateKey) {
// Note rollbackVerision is durable, [rollbackVersion+1 - deltas.version] needs to be tossed
// For correctness right now, there can be no waits and yields either in rollback handling
// or in handleBlobGranuleFileRequest once waitForVersion has succeeded, otherwise this will
// race and clobber results
Version rollbackVersion;
BinaryReader br(deltas.mutations[0].param2, Unversioned());
br >> rollbackVersion;
// FIXME: THIS IS FALSE!! delta can commit by getting committed version out of band, without
// seeing rollback mutation.
ASSERT(rollbackVersion >= metadata->durableDeltaVersion.get());
if (!rollbacksInProgress.empty()) {
ASSERT(rollbacksInProgress.front().first == rollbackVersion);
ASSERT(rollbacksInProgress.front().second == deltas.version);
fmt::print("Passed rollback {0} -> {1}\n", deltas.version, rollbackVersion);
rollbacksCompleted.push_back(rollbacksInProgress.front());
rollbacksInProgress.pop_front();
} else {
// FIXME: add counter for granule rollbacks and rollbacks skipped?
// explicitly check last delta in currentDeltas because lastVersion and bufferedDeltaVersion
// include empties
if (metadata->pendingDeltaVersion <= rollbackVersion &&
(metadata->currentDeltas.empty() ||
metadata->currentDeltas.back().version <= rollbackVersion)) {
if (BW_DEBUG) {
fmt::print("BW skipping rollback {0} -> {1} completely\n",
deltas.version,
rollbackVersion);
}
} else {
if (BW_DEBUG) {
fmt::print("BW [{0} - {1}) ROLLBACK @ {2} -> {3}\n",
metadata->keyRange.begin.printable(),
metadata->keyRange.end.printable(),
deltas.version,
rollbackVersion);
TraceEvent(SevWarn, "GranuleRollback", bwData->id)
.detail("Granule", metadata->keyRange)
.detail("Version", deltas.version)
.detail("RollbackVersion", rollbackVersion);
}
Version cfRollbackVersion = doGranuleRollback(metadata,
deltas.version,
rollbackVersion,
inFlightDeltaFiles,
rollbacksInProgress,
rollbacksCompleted);
// reset change feeds to cfRollbackVersion
if (readOldChangeFeed) {
oldChangeFeedStream = makeReference<ChangeFeedData>();
oldChangeFeedFuture = bwData->db->getChangeFeedStream(
oldChangeFeedStream,
oldCFKey.get(),
cfRollbackVersion + 1,
MAX_VERSION,
startState.parentGranule.get().first /*metadata->keyRange*/);
} else {
changeFeedStream = makeReference<ChangeFeedData>();
changeFeedFuture = bwData->db->getChangeFeedStream(changeFeedStream,
cfKey,
cfRollbackVersion + 1,
MAX_VERSION,
metadata->keyRange);
}
justDidRollback = true;
break;
}
}
} else if (!rollbacksInProgress.empty() && rollbacksInProgress.front().first < deltas.version &&
rollbacksInProgress.front().second > deltas.version) {
if (BW_DEBUG) {
fmt::print("Skipping mutations @ {} b/c prior rollback\n", deltas.version);
}
} else {
for (auto& delta : deltas.mutations) {
metadata->bufferedDeltaBytes += delta.totalSize();
bwData->stats.changeFeedInputBytes += delta.totalSize();
bwData->stats.mutationBytesBuffered += delta.totalSize();
DEBUG_MUTATION("BlobWorkerBuffer", deltas.version, delta, bwData->id)
.detail("Granule", metadata->keyRange)
.detail("ChangeFeedID", readOldChangeFeed ? oldCFKey.get() : cfKey)
.detail("OldChangeFeed", readOldChangeFeed ? "T" : "F");
}
metadata->currentDeltas.push_back_deep(metadata->deltaArena, deltas);
}
}
if (justDidRollback) {
break;
}
lastVersion = deltas.version;
}
if (lastFromOldChangeFeed && !justDidRollback) {
readOldChangeFeed = false;
lastFromOldChangeFeed = false;
// set this so next delta file write updates granule split metadata to done
ASSERT(startState.parentGranule.present());
oldChangeFeedDataComplete = startState.parentGranule.get();
if (BW_DEBUG) {
fmt::print("Granule [{0} - {1}) switching to new change feed {2} @ {3}\n",
metadata->keyRange.begin.printable(),
metadata->keyRange.end.printable(),
startState.granuleID.toString(),
metadata->bufferedDeltaVersion.get());
}
}
justDidRollback = false;
}
} catch (Error& e) {
if (e.code() == error_code_operation_cancelled) {
throw;
}
if (metadata->cancelled.canBeSet()) {
metadata->cancelled.send(Void());
}
if (e.code() == error_code_granule_assignment_conflict) {
TraceEvent(SevInfo, "GranuleAssignmentConflict", bwData->id).detail("Granule", metadata->keyRange);
} else {
++bwData->stats.granuleUpdateErrors;
if (BW_DEBUG) {
printf("Granule file updater for [%s - %s) got error %s, exiting\n",
metadata->keyRange.begin.printable().c_str(),
metadata->keyRange.end.printable().c_str(),
e.name());
}
TraceEvent(SevWarn, "GranuleFileUpdaterError", bwData->id).detail("Granule", metadata->keyRange).error(e);
if (granuleCanRetry(e)) {
// explicitly cancel all outstanding write futures BEFORE updating promise stream, to ensure they
// can't update files after the re-assigned granule acquires the lock
inFlightBlobSnapshot.cancel();
for (auto& f : inFlightDeltaFiles) {
f.future.cancel();
}
bwData->granuleUpdateErrors.send(metadata->originalReq);
}
}
throw e;
}
}