in fdbserver/storageserver.actor.cpp [5096:5514]
ACTOR Future<Void> update(StorageServer* data, bool* pReceivedUpdate) {
state double start;
try {
// If we are disk bound and durableVersion is very old, we need to block updates or we could run out of memory
// This is often referred to as the storage server e-brake (emergency brake)
// We allow the storage server to make some progress between e-brake periods, referreed to as "overage", in
// order to ensure that it advances desiredOldestVersion enough for updateStorage to make enough progress on
// freeing up queue size.
state double waitStartT = 0;
if (data->queueSize() >= SERVER_KNOBS->STORAGE_HARD_LIMIT_BYTES &&
data->durableVersion.get() < data->desiredOldestVersion.get() &&
((data->desiredOldestVersion.get() - SERVER_KNOBS->STORAGE_HARD_LIMIT_VERSION_OVERAGE >
data->lastDurableVersionEBrake) ||
(data->counters.bytesInput.getValue() - SERVER_KNOBS->STORAGE_HARD_LIMIT_BYTES_OVERAGE >
data->lastBytesInputEBrake))) {
while (data->queueSize() >= SERVER_KNOBS->STORAGE_HARD_LIMIT_BYTES &&
data->durableVersion.get() < data->desiredOldestVersion.get()) {
if (now() - waitStartT >= 1) {
TraceEvent(SevWarn, "StorageServerUpdateLag", data->thisServerID)
.detail("Version", data->version.get())
.detail("DurableVersion", data->durableVersion.get())
.detail("DesiredOldestVersion", data->desiredOldestVersion.get())
.detail("QueueSize", data->queueSize())
.detail("LastBytesInputEBrake", data->lastBytesInputEBrake)
.detail("LastDurableVersionEBrake", data->lastDurableVersionEBrake);
waitStartT = now();
}
data->behind = true;
wait(delayJittered(.005, TaskPriority::TLogPeekReply));
}
data->lastBytesInputEBrake = data->counters.bytesInput.getValue();
data->lastDurableVersionEBrake = data->durableVersion.get();
}
if (g_network->isSimulated() && data->isTss() && g_simulator.tssMode == ISimulator::TSSMode::EnabledAddDelay &&
!g_simulator.speedUpSimulation && data->tssFaultInjectTime.present() &&
data->tssFaultInjectTime.get() < now()) {
if (deterministicRandom()->random01() < 0.01) {
TraceEvent(SevWarnAlways, "TSSInjectDelayForever", data->thisServerID).log();
// small random chance to just completely get stuck here, each tss should eventually hit this in this
// mode
wait(tssDelayForever());
} else {
// otherwise pause for part of a second
double delayTime = deterministicRandom()->random01();
TraceEvent(SevWarnAlways, "TSSInjectDelay", data->thisServerID).detail("Delay", delayTime);
wait(delay(delayTime));
}
}
while (data->byteSampleClearsTooLarge.get()) {
wait(data->byteSampleClearsTooLarge.onChange());
}
state Reference<ILogSystem::IPeekCursor> cursor = data->logCursor;
state double beforeTLogCursorReads = now();
loop {
wait(cursor->getMore());
if (!cursor->isExhausted()) {
break;
}
}
data->tlogCursorReadsLatencyHistogram->sampleSeconds(now() - beforeTLogCursorReads);
if (cursor->popped() > 0) {
TraceEvent("StorageServerWorkerRemoved", data->thisServerID).detail("Reason", "PeekPoppedTLogData");
throw worker_removed();
}
++data->counters.updateBatches;
data->lastTLogVersion = cursor->getMaxKnownVersion();
data->knownCommittedVersion = cursor->getMinKnownCommittedVersion();
data->versionLag = std::max<int64_t>(0, data->lastTLogVersion - data->version.get());
ASSERT(*pReceivedUpdate == false);
*pReceivedUpdate = true;
start = now();
wait(data->durableVersionLock.take(TaskPriority::TLogPeekReply, 1));
state FlowLock::Releaser holdingDVL(data->durableVersionLock);
if (now() - start > 0.1)
TraceEvent("SSSlowTakeLock1", data->thisServerID)
.detailf("From", "%016llx", debug_lastLoadBalanceResultEndpointToken)
.detail("Duration", now() - start)
.detail("Version", data->version.get());
data->ssVersionLockLatencyHistogram->sampleSeconds(now() - start);
start = now();
state UpdateEagerReadInfo eager;
state FetchInjectionInfo fii;
state Reference<ILogSystem::IPeekCursor> cloneCursor2;
loop {
state uint64_t changeCounter = data->shardChangeCounter;
bool epochEnd = false;
bool hasPrivateData = false;
bool firstMutation = true;
bool dbgLastMessageWasProtocol = false;
Reference<ILogSystem::IPeekCursor> cloneCursor1 = cursor->cloneNoMore();
cloneCursor2 = cursor->cloneNoMore();
cloneCursor1->setProtocolVersion(data->logProtocol);
for (; cloneCursor1->hasMessage(); cloneCursor1->nextMessage()) {
ArenaReader& cloneReader = *cloneCursor1->reader();
if (LogProtocolMessage::isNextIn(cloneReader)) {
LogProtocolMessage lpm;
cloneReader >> lpm;
//TraceEvent(SevDebug, "SSReadingLPM", data->thisServerID).detail("Mutation", lpm);
dbgLastMessageWasProtocol = true;
cloneCursor1->setProtocolVersion(cloneReader.protocolVersion());
} else if (cloneReader.protocolVersion().hasSpanContext() &&
SpanContextMessage::isNextIn(cloneReader)) {
SpanContextMessage scm;
cloneReader >> scm;
} else {
MutationRef msg;
cloneReader >> msg;
// TraceEvent(SevDebug, "SSReadingLog", data->thisServerID).detail("Mutation", msg);
if (firstMutation && msg.param1.startsWith(systemKeys.end))
hasPrivateData = true;
firstMutation = false;
if (msg.param1 == lastEpochEndPrivateKey) {
epochEnd = true;
ASSERT(dbgLastMessageWasProtocol);
}
eager.addMutation(msg);
dbgLastMessageWasProtocol = false;
}
}
// Any fetchKeys which are ready to transition their shards to the adding,transferred state do so now.
// If there is an epoch end we skip this step, to increase testability and to prevent inserting a version in
// the middle of a rolled back version range.
while (!hasPrivateData && !epochEnd && !data->readyFetchKeys.empty()) {
auto fk = data->readyFetchKeys.back();
data->readyFetchKeys.pop_back();
fk.send(&fii);
// fetchKeys() would put the data it fetched into the fii. The thread will not return back to this actor
// until it was completed.
}
for (auto& c : fii.changes)
eager.addMutations(c.mutations);
wait(doEagerReads(data, &eager));
if (data->shardChangeCounter == changeCounter)
break;
TEST(true); // A fetchKeys completed while we were doing this, so eager might be outdated. Read it again.
// SOMEDAY: Theoretically we could check the change counters of individual shards and retry the reads only
// selectively
eager = UpdateEagerReadInfo();
}
data->eagerReadsLatencyHistogram->sampleSeconds(now() - start);
if (now() - start > 0.1)
TraceEvent("SSSlowTakeLock2", data->thisServerID)
.detailf("From", "%016llx", debug_lastLoadBalanceResultEndpointToken)
.detail("Duration", now() - start)
.detail("Version", data->version.get());
data->updateEagerReads = &eager;
data->debug_inApplyUpdate = true;
state StorageUpdater updater(data->lastVersionWithData, data->restoredVersion);
if (EXPENSIVE_VALIDATION)
data->data().atLatest().validate();
validate(data);
state bool injectedChanges = false;
state int changeNum = 0;
state int mutationBytes = 0;
state double beforeFetchKeysUpdates = now();
for (; changeNum < fii.changes.size(); changeNum++) {
state int mutationNum = 0;
state VerUpdateRef* pUpdate = &fii.changes[changeNum];
for (; mutationNum < pUpdate->mutations.size(); mutationNum++) {
updater.applyMutation(data, pUpdate->mutations[mutationNum], pUpdate->version, true);
mutationBytes += pUpdate->mutations[mutationNum].totalSize();
// data->counters.mutationBytes or data->counters.mutations should not be updated because they should
// have counted when the mutations arrive from cursor initially.
injectedChanges = true;
if (mutationBytes > SERVER_KNOBS->DESIRED_UPDATE_BYTES) {
mutationBytes = 0;
wait(delay(SERVER_KNOBS->UPDATE_DELAY));
}
}
}
data->fetchKeysPTreeUpdatesLatencyHistogram->sampleSeconds(now() - beforeFetchKeysUpdates);
state Version ver = invalidVersion;
cloneCursor2->setProtocolVersion(data->logProtocol);
state SpanID spanContext = SpanID();
state double beforeTLogMsgsUpdates = now();
state std::set<Key> updatedChangeFeeds;
for (; cloneCursor2->hasMessage(); cloneCursor2->nextMessage()) {
if (mutationBytes > SERVER_KNOBS->DESIRED_UPDATE_BYTES) {
mutationBytes = 0;
// Instead of just yielding, leave time for the storage server to respond to reads
wait(delay(SERVER_KNOBS->UPDATE_DELAY));
}
if (cloneCursor2->version().version > ver) {
ASSERT(cloneCursor2->version().version > data->version.get());
}
auto& rd = *cloneCursor2->reader();
if (cloneCursor2->version().version > ver && cloneCursor2->version().version > data->version.get()) {
++data->counters.updateVersions;
if (data->currentChangeFeeds.size()) {
data->changeFeedVersions.emplace_back(
std::vector<Key>(data->currentChangeFeeds.begin(), data->currentChangeFeeds.end()), ver);
updatedChangeFeeds.insert(data->currentChangeFeeds.begin(), data->currentChangeFeeds.end());
data->currentChangeFeeds.clear();
}
ver = cloneCursor2->version().version;
}
if (LogProtocolMessage::isNextIn(rd)) {
LogProtocolMessage lpm;
rd >> lpm;
data->logProtocol = rd.protocolVersion();
data->storage.changeLogProtocol(ver, data->logProtocol);
cloneCursor2->setProtocolVersion(rd.protocolVersion());
spanContext = UID();
} else if (rd.protocolVersion().hasSpanContext() && SpanContextMessage::isNextIn(rd)) {
SpanContextMessage scm;
rd >> scm;
spanContext = scm.spanContext;
} else {
MutationRef msg;
rd >> msg;
Span span("SS:update"_loc, { spanContext });
span.addTag("key"_sr, msg.param1);
// Drop non-private mutations if TSS fault injection is enabled in simulation, or if this is a TSS in
// quarantine.
if (g_network->isSimulated() && data->isTss() && !g_simulator.speedUpSimulation &&
g_simulator.tssMode == ISimulator::TSSMode::EnabledDropMutations &&
data->tssFaultInjectTime.present() && data->tssFaultInjectTime.get() < now() &&
(msg.type == MutationRef::SetValue || msg.type == MutationRef::ClearRange) &&
(msg.param1.size() < 2 || msg.param1[0] != 0xff || msg.param1[1] != 0xff) &&
deterministicRandom()->random01() < 0.05) {
TraceEvent(SevWarnAlways, "TSSInjectDropMutation", data->thisServerID)
.detail("Mutation", msg)
.detail("Version", cloneCursor2->version().toString());
} else if (data->isTSSInQuarantine() &&
(msg.param1.size() < 2 || msg.param1[0] != 0xff || msg.param1[1] != 0xff)) {
TraceEvent("TSSQuarantineDropMutation", data->thisServerID)
.suppressFor(10.0)
.detail("Version", cloneCursor2->version().toString());
} else if (ver != invalidVersion) { // This change belongs to a version < minVersion
DEBUG_MUTATION("SSPeek", ver, msg, data->thisServerID);
if (ver == 1) {
//TraceEvent("SSPeekMutation", data->thisServerID).log();
// The following trace event may produce a value with special characters
TraceEvent("SSPeekMutation", data->thisServerID)
.detail("Mutation", msg)
.detail("Version", cloneCursor2->version().toString());
}
updater.applyMutation(data, msg, ver, false);
mutationBytes += msg.totalSize();
data->counters.mutationBytes += msg.totalSize();
data->counters.logicalBytesInput += msg.expectedSize();
++data->counters.mutations;
switch (msg.type) {
case MutationRef::SetValue:
++data->counters.setMutations;
break;
case MutationRef::ClearRange:
++data->counters.clearRangeMutations;
break;
case MutationRef::AddValue:
case MutationRef::And:
case MutationRef::AndV2:
case MutationRef::AppendIfFits:
case MutationRef::ByteMax:
case MutationRef::ByteMin:
case MutationRef::Max:
case MutationRef::Min:
case MutationRef::MinV2:
case MutationRef::Or:
case MutationRef::Xor:
case MutationRef::CompareAndClear:
++data->counters.atomicMutations;
break;
}
} else
TraceEvent(SevError, "DiscardingPeekedData", data->thisServerID)
.detail("Mutation", msg)
.detail("Version", cloneCursor2->version().toString());
}
}
data->tLogMsgsPTreeUpdatesLatencyHistogram->sampleSeconds(now() - beforeTLogMsgsUpdates);
if (data->currentChangeFeeds.size()) {
data->changeFeedVersions.emplace_back(
std::vector<Key>(data->currentChangeFeeds.begin(), data->currentChangeFeeds.end()), ver);
updatedChangeFeeds.insert(data->currentChangeFeeds.begin(), data->currentChangeFeeds.end());
data->currentChangeFeeds.clear();
}
if (ver != invalidVersion) {
data->lastVersionWithData = ver;
}
ver = cloneCursor2->version().version - 1;
if (injectedChanges)
data->lastVersionWithData = ver;
data->updateEagerReads = nullptr;
data->debug_inApplyUpdate = false;
if (ver == invalidVersion && !fii.changes.empty()) {
ver = updater.currentVersion;
}
if (ver != invalidVersion && ver > data->version.get()) {
// TODO(alexmiller): Update to version tracking.
// DEBUG_KEY_RANGE("SSUpdate", ver, KeyRangeRef());
data->mutableData().createNewVersion(ver);
if (data->otherError.getFuture().isReady())
data->otherError.getFuture().get();
data->counters.fetchedVersions += (ver - data->version.get());
++data->counters.fetchesFromLogs;
Optional<UID> curSourceTLogID = cursor->getCurrentPeekLocation();
if (curSourceTLogID != data->sourceTLogID) {
data->sourceTLogID = curSourceTLogID;
TraceEvent("StorageServerSourceTLogID", data->thisServerID)
.detail("SourceTLogID",
data->sourceTLogID.present() ? data->sourceTLogID.get().toString() : "unknown")
.trackLatest(data->storageServerSourceTLogIDEventHolder->trackingKey);
}
data->noRecentUpdates.set(false);
data->lastUpdate = now();
data->prevVersion = data->version.get();
data->version.set(ver); // Triggers replies to waiting gets for new version(s)
for (auto& it : updatedChangeFeeds) {
auto feed = data->uidChangeFeed.find(it);
if (feed != data->uidChangeFeed.end()) {
feed->second->newMutations.trigger();
}
}
setDataVersion(data->thisServerID, data->version.get());
if (data->otherError.getFuture().isReady())
data->otherError.getFuture().get();
Version maxVersionsInMemory = SERVER_KNOBS->MAX_READ_TRANSACTION_LIFE_VERSIONS;
for (int i = 0; i < data->recoveryVersionSkips.size(); i++) {
maxVersionsInMemory += data->recoveryVersionSkips[i].second;
}
// Trigger updateStorage if necessary
Version proposedOldestVersion =
std::max(data->version.get(), cursor->getMinKnownCommittedVersion()) - maxVersionsInMemory;
if (data->primaryLocality == tagLocalitySpecial || data->tag.locality == data->primaryLocality) {
proposedOldestVersion = std::max(proposedOldestVersion, data->lastTLogVersion - maxVersionsInMemory);
}
proposedOldestVersion = std::min(proposedOldestVersion, data->version.get() - 1);
proposedOldestVersion = std::max(proposedOldestVersion, data->oldestVersion.get());
proposedOldestVersion = std::max(proposedOldestVersion, data->desiredOldestVersion.get());
//TraceEvent("StorageServerUpdated", data->thisServerID).detail("Ver", ver).detail("DataVersion", data->version.get())
// .detail("LastTLogVersion", data->lastTLogVersion).detail("NewOldest",
// data->oldestVersion.get()).detail("DesiredOldest",data->desiredOldestVersion.get())
// .detail("MaxVersionInMemory", maxVersionsInMemory).detail("Proposed",
// proposedOldestVersion).detail("PrimaryLocality", data->primaryLocality).detail("Tag",
// data->tag.toString());
while (!data->recoveryVersionSkips.empty() &&
proposedOldestVersion > data->recoveryVersionSkips.front().first) {
data->recoveryVersionSkips.pop_front();
}
data->desiredOldestVersion.set(proposedOldestVersion);
}
validate(data);
data->logCursor->advanceTo(cloneCursor2->version());
if (cursor->version().version >= data->lastTLogVersion) {
if (data->behind) {
TraceEvent("StorageServerNoLongerBehind", data->thisServerID)
.detail("CursorVersion", cursor->version().version)
.detail("TLogVersion", data->lastTLogVersion);
}
data->behind = false;
}
return Void(); // update will get called again ASAP
} catch (Error& err) {
state Error e = err;
if (e.code() != error_code_worker_removed && e.code() != error_code_please_reboot) {
TraceEvent(SevError, "SSUpdateError", data->thisServerID).error(e).backtrace();
} else if (e.code() == error_code_please_reboot) {
wait(data->durableInProgress);
}
throw e;
}
}