ACTOR Future update()

in fdbserver/storageserver.actor.cpp [5096:5514]


ACTOR Future<Void> update(StorageServer* data, bool* pReceivedUpdate) {
	state double start;
	try {
		// If we are disk bound and durableVersion is very old, we need to block updates or we could run out of memory
		// This is often referred to as the storage server e-brake (emergency brake)

		// We allow the storage server to make some progress between e-brake periods, referreed to as "overage", in
		// order to ensure that it advances desiredOldestVersion enough for updateStorage to make enough progress on
		// freeing up queue size.
		state double waitStartT = 0;
		if (data->queueSize() >= SERVER_KNOBS->STORAGE_HARD_LIMIT_BYTES &&
		    data->durableVersion.get() < data->desiredOldestVersion.get() &&
		    ((data->desiredOldestVersion.get() - SERVER_KNOBS->STORAGE_HARD_LIMIT_VERSION_OVERAGE >
		      data->lastDurableVersionEBrake) ||
		     (data->counters.bytesInput.getValue() - SERVER_KNOBS->STORAGE_HARD_LIMIT_BYTES_OVERAGE >
		      data->lastBytesInputEBrake))) {

			while (data->queueSize() >= SERVER_KNOBS->STORAGE_HARD_LIMIT_BYTES &&
			       data->durableVersion.get() < data->desiredOldestVersion.get()) {
				if (now() - waitStartT >= 1) {
					TraceEvent(SevWarn, "StorageServerUpdateLag", data->thisServerID)
					    .detail("Version", data->version.get())
					    .detail("DurableVersion", data->durableVersion.get())
					    .detail("DesiredOldestVersion", data->desiredOldestVersion.get())
					    .detail("QueueSize", data->queueSize())
					    .detail("LastBytesInputEBrake", data->lastBytesInputEBrake)
					    .detail("LastDurableVersionEBrake", data->lastDurableVersionEBrake);
					waitStartT = now();
				}

				data->behind = true;
				wait(delayJittered(.005, TaskPriority::TLogPeekReply));
			}
			data->lastBytesInputEBrake = data->counters.bytesInput.getValue();
			data->lastDurableVersionEBrake = data->durableVersion.get();
		}

		if (g_network->isSimulated() && data->isTss() && g_simulator.tssMode == ISimulator::TSSMode::EnabledAddDelay &&
		    !g_simulator.speedUpSimulation && data->tssFaultInjectTime.present() &&
		    data->tssFaultInjectTime.get() < now()) {
			if (deterministicRandom()->random01() < 0.01) {
				TraceEvent(SevWarnAlways, "TSSInjectDelayForever", data->thisServerID).log();
				// small random chance to just completely get stuck here, each tss should eventually hit this in this
				// mode
				wait(tssDelayForever());
			} else {
				// otherwise pause for part of a second
				double delayTime = deterministicRandom()->random01();
				TraceEvent(SevWarnAlways, "TSSInjectDelay", data->thisServerID).detail("Delay", delayTime);
				wait(delay(delayTime));
			}
		}

		while (data->byteSampleClearsTooLarge.get()) {
			wait(data->byteSampleClearsTooLarge.onChange());
		}

		state Reference<ILogSystem::IPeekCursor> cursor = data->logCursor;

		state double beforeTLogCursorReads = now();
		loop {
			wait(cursor->getMore());
			if (!cursor->isExhausted()) {
				break;
			}
		}
		data->tlogCursorReadsLatencyHistogram->sampleSeconds(now() - beforeTLogCursorReads);
		if (cursor->popped() > 0) {
			TraceEvent("StorageServerWorkerRemoved", data->thisServerID).detail("Reason", "PeekPoppedTLogData");
			throw worker_removed();
		}

		++data->counters.updateBatches;
		data->lastTLogVersion = cursor->getMaxKnownVersion();
		data->knownCommittedVersion = cursor->getMinKnownCommittedVersion();
		data->versionLag = std::max<int64_t>(0, data->lastTLogVersion - data->version.get());

		ASSERT(*pReceivedUpdate == false);
		*pReceivedUpdate = true;

		start = now();
		wait(data->durableVersionLock.take(TaskPriority::TLogPeekReply, 1));
		state FlowLock::Releaser holdingDVL(data->durableVersionLock);
		if (now() - start > 0.1)
			TraceEvent("SSSlowTakeLock1", data->thisServerID)
			    .detailf("From", "%016llx", debug_lastLoadBalanceResultEndpointToken)
			    .detail("Duration", now() - start)
			    .detail("Version", data->version.get());
		data->ssVersionLockLatencyHistogram->sampleSeconds(now() - start);

		start = now();
		state UpdateEagerReadInfo eager;
		state FetchInjectionInfo fii;
		state Reference<ILogSystem::IPeekCursor> cloneCursor2;

		loop {
			state uint64_t changeCounter = data->shardChangeCounter;
			bool epochEnd = false;
			bool hasPrivateData = false;
			bool firstMutation = true;
			bool dbgLastMessageWasProtocol = false;

			Reference<ILogSystem::IPeekCursor> cloneCursor1 = cursor->cloneNoMore();
			cloneCursor2 = cursor->cloneNoMore();

			cloneCursor1->setProtocolVersion(data->logProtocol);

			for (; cloneCursor1->hasMessage(); cloneCursor1->nextMessage()) {
				ArenaReader& cloneReader = *cloneCursor1->reader();

				if (LogProtocolMessage::isNextIn(cloneReader)) {
					LogProtocolMessage lpm;
					cloneReader >> lpm;
					//TraceEvent(SevDebug, "SSReadingLPM", data->thisServerID).detail("Mutation", lpm);
					dbgLastMessageWasProtocol = true;
					cloneCursor1->setProtocolVersion(cloneReader.protocolVersion());
				} else if (cloneReader.protocolVersion().hasSpanContext() &&
				           SpanContextMessage::isNextIn(cloneReader)) {
					SpanContextMessage scm;
					cloneReader >> scm;
				} else {
					MutationRef msg;
					cloneReader >> msg;
					// TraceEvent(SevDebug, "SSReadingLog", data->thisServerID).detail("Mutation", msg);

					if (firstMutation && msg.param1.startsWith(systemKeys.end))
						hasPrivateData = true;
					firstMutation = false;

					if (msg.param1 == lastEpochEndPrivateKey) {
						epochEnd = true;
						ASSERT(dbgLastMessageWasProtocol);
					}

					eager.addMutation(msg);
					dbgLastMessageWasProtocol = false;
				}
			}

			// Any fetchKeys which are ready to transition their shards to the adding,transferred state do so now.
			// If there is an epoch end we skip this step, to increase testability and to prevent inserting a version in
			// the middle of a rolled back version range.
			while (!hasPrivateData && !epochEnd && !data->readyFetchKeys.empty()) {
				auto fk = data->readyFetchKeys.back();
				data->readyFetchKeys.pop_back();
				fk.send(&fii);
				// fetchKeys() would put the data it fetched into the fii. The thread will not return back to this actor
				// until it was completed.
			}

			for (auto& c : fii.changes)
				eager.addMutations(c.mutations);

			wait(doEagerReads(data, &eager));
			if (data->shardChangeCounter == changeCounter)
				break;
			TEST(true); // A fetchKeys completed while we were doing this, so eager might be outdated.  Read it again.
			// SOMEDAY: Theoretically we could check the change counters of individual shards and retry the reads only
			// selectively
			eager = UpdateEagerReadInfo();
		}
		data->eagerReadsLatencyHistogram->sampleSeconds(now() - start);

		if (now() - start > 0.1)
			TraceEvent("SSSlowTakeLock2", data->thisServerID)
			    .detailf("From", "%016llx", debug_lastLoadBalanceResultEndpointToken)
			    .detail("Duration", now() - start)
			    .detail("Version", data->version.get());

		data->updateEagerReads = &eager;
		data->debug_inApplyUpdate = true;

		state StorageUpdater updater(data->lastVersionWithData, data->restoredVersion);

		if (EXPENSIVE_VALIDATION)
			data->data().atLatest().validate();
		validate(data);

		state bool injectedChanges = false;
		state int changeNum = 0;
		state int mutationBytes = 0;
		state double beforeFetchKeysUpdates = now();
		for (; changeNum < fii.changes.size(); changeNum++) {
			state int mutationNum = 0;
			state VerUpdateRef* pUpdate = &fii.changes[changeNum];
			for (; mutationNum < pUpdate->mutations.size(); mutationNum++) {
				updater.applyMutation(data, pUpdate->mutations[mutationNum], pUpdate->version, true);
				mutationBytes += pUpdate->mutations[mutationNum].totalSize();
				// data->counters.mutationBytes or data->counters.mutations should not be updated because they should
				// have counted when the mutations arrive from cursor initially.
				injectedChanges = true;
				if (mutationBytes > SERVER_KNOBS->DESIRED_UPDATE_BYTES) {
					mutationBytes = 0;
					wait(delay(SERVER_KNOBS->UPDATE_DELAY));
				}
			}
		}
		data->fetchKeysPTreeUpdatesLatencyHistogram->sampleSeconds(now() - beforeFetchKeysUpdates);

		state Version ver = invalidVersion;
		cloneCursor2->setProtocolVersion(data->logProtocol);
		state SpanID spanContext = SpanID();
		state double beforeTLogMsgsUpdates = now();
		state std::set<Key> updatedChangeFeeds;
		for (; cloneCursor2->hasMessage(); cloneCursor2->nextMessage()) {
			if (mutationBytes > SERVER_KNOBS->DESIRED_UPDATE_BYTES) {
				mutationBytes = 0;
				// Instead of just yielding, leave time for the storage server to respond to reads
				wait(delay(SERVER_KNOBS->UPDATE_DELAY));
			}

			if (cloneCursor2->version().version > ver) {
				ASSERT(cloneCursor2->version().version > data->version.get());
			}

			auto& rd = *cloneCursor2->reader();

			if (cloneCursor2->version().version > ver && cloneCursor2->version().version > data->version.get()) {
				++data->counters.updateVersions;
				if (data->currentChangeFeeds.size()) {
					data->changeFeedVersions.emplace_back(
					    std::vector<Key>(data->currentChangeFeeds.begin(), data->currentChangeFeeds.end()), ver);
					updatedChangeFeeds.insert(data->currentChangeFeeds.begin(), data->currentChangeFeeds.end());
					data->currentChangeFeeds.clear();
				}
				ver = cloneCursor2->version().version;
			}

			if (LogProtocolMessage::isNextIn(rd)) {
				LogProtocolMessage lpm;
				rd >> lpm;

				data->logProtocol = rd.protocolVersion();
				data->storage.changeLogProtocol(ver, data->logProtocol);
				cloneCursor2->setProtocolVersion(rd.protocolVersion());
				spanContext = UID();
			} else if (rd.protocolVersion().hasSpanContext() && SpanContextMessage::isNextIn(rd)) {
				SpanContextMessage scm;
				rd >> scm;
				spanContext = scm.spanContext;
			} else {
				MutationRef msg;
				rd >> msg;

				Span span("SS:update"_loc, { spanContext });
				span.addTag("key"_sr, msg.param1);

				// Drop non-private mutations if TSS fault injection is enabled in simulation, or if this is a TSS in
				// quarantine.
				if (g_network->isSimulated() && data->isTss() && !g_simulator.speedUpSimulation &&
				    g_simulator.tssMode == ISimulator::TSSMode::EnabledDropMutations &&
				    data->tssFaultInjectTime.present() && data->tssFaultInjectTime.get() < now() &&
				    (msg.type == MutationRef::SetValue || msg.type == MutationRef::ClearRange) &&
				    (msg.param1.size() < 2 || msg.param1[0] != 0xff || msg.param1[1] != 0xff) &&
				    deterministicRandom()->random01() < 0.05) {
					TraceEvent(SevWarnAlways, "TSSInjectDropMutation", data->thisServerID)
					    .detail("Mutation", msg)
					    .detail("Version", cloneCursor2->version().toString());
				} else if (data->isTSSInQuarantine() &&
				           (msg.param1.size() < 2 || msg.param1[0] != 0xff || msg.param1[1] != 0xff)) {
					TraceEvent("TSSQuarantineDropMutation", data->thisServerID)
					    .suppressFor(10.0)
					    .detail("Version", cloneCursor2->version().toString());
				} else if (ver != invalidVersion) { // This change belongs to a version < minVersion
					DEBUG_MUTATION("SSPeek", ver, msg, data->thisServerID);
					if (ver == 1) {
						//TraceEvent("SSPeekMutation", data->thisServerID).log();
						// The following trace event may produce a value with special characters
						TraceEvent("SSPeekMutation", data->thisServerID)
						    .detail("Mutation", msg)
						    .detail("Version", cloneCursor2->version().toString());
					}

					updater.applyMutation(data, msg, ver, false);
					mutationBytes += msg.totalSize();
					data->counters.mutationBytes += msg.totalSize();
					data->counters.logicalBytesInput += msg.expectedSize();
					++data->counters.mutations;
					switch (msg.type) {
					case MutationRef::SetValue:
						++data->counters.setMutations;
						break;
					case MutationRef::ClearRange:
						++data->counters.clearRangeMutations;
						break;
					case MutationRef::AddValue:
					case MutationRef::And:
					case MutationRef::AndV2:
					case MutationRef::AppendIfFits:
					case MutationRef::ByteMax:
					case MutationRef::ByteMin:
					case MutationRef::Max:
					case MutationRef::Min:
					case MutationRef::MinV2:
					case MutationRef::Or:
					case MutationRef::Xor:
					case MutationRef::CompareAndClear:
						++data->counters.atomicMutations;
						break;
					}
				} else
					TraceEvent(SevError, "DiscardingPeekedData", data->thisServerID)
					    .detail("Mutation", msg)
					    .detail("Version", cloneCursor2->version().toString());
			}
		}
		data->tLogMsgsPTreeUpdatesLatencyHistogram->sampleSeconds(now() - beforeTLogMsgsUpdates);
		if (data->currentChangeFeeds.size()) {
			data->changeFeedVersions.emplace_back(
			    std::vector<Key>(data->currentChangeFeeds.begin(), data->currentChangeFeeds.end()), ver);
			updatedChangeFeeds.insert(data->currentChangeFeeds.begin(), data->currentChangeFeeds.end());
			data->currentChangeFeeds.clear();
		}

		if (ver != invalidVersion) {
			data->lastVersionWithData = ver;
		}
		ver = cloneCursor2->version().version - 1;

		if (injectedChanges)
			data->lastVersionWithData = ver;

		data->updateEagerReads = nullptr;
		data->debug_inApplyUpdate = false;

		if (ver == invalidVersion && !fii.changes.empty()) {
			ver = updater.currentVersion;
		}

		if (ver != invalidVersion && ver > data->version.get()) {
			// TODO(alexmiller): Update to version tracking.
			// DEBUG_KEY_RANGE("SSUpdate", ver, KeyRangeRef());

			data->mutableData().createNewVersion(ver);
			if (data->otherError.getFuture().isReady())
				data->otherError.getFuture().get();

			data->counters.fetchedVersions += (ver - data->version.get());
			++data->counters.fetchesFromLogs;
			Optional<UID> curSourceTLogID = cursor->getCurrentPeekLocation();

			if (curSourceTLogID != data->sourceTLogID) {
				data->sourceTLogID = curSourceTLogID;

				TraceEvent("StorageServerSourceTLogID", data->thisServerID)
				    .detail("SourceTLogID",
				            data->sourceTLogID.present() ? data->sourceTLogID.get().toString() : "unknown")
				    .trackLatest(data->storageServerSourceTLogIDEventHolder->trackingKey);
			}

			data->noRecentUpdates.set(false);
			data->lastUpdate = now();

			data->prevVersion = data->version.get();
			data->version.set(ver); // Triggers replies to waiting gets for new version(s)

			for (auto& it : updatedChangeFeeds) {
				auto feed = data->uidChangeFeed.find(it);
				if (feed != data->uidChangeFeed.end()) {
					feed->second->newMutations.trigger();
				}
			}

			setDataVersion(data->thisServerID, data->version.get());
			if (data->otherError.getFuture().isReady())
				data->otherError.getFuture().get();

			Version maxVersionsInMemory = SERVER_KNOBS->MAX_READ_TRANSACTION_LIFE_VERSIONS;
			for (int i = 0; i < data->recoveryVersionSkips.size(); i++) {
				maxVersionsInMemory += data->recoveryVersionSkips[i].second;
			}

			// Trigger updateStorage if necessary
			Version proposedOldestVersion =
			    std::max(data->version.get(), cursor->getMinKnownCommittedVersion()) - maxVersionsInMemory;
			if (data->primaryLocality == tagLocalitySpecial || data->tag.locality == data->primaryLocality) {
				proposedOldestVersion = std::max(proposedOldestVersion, data->lastTLogVersion - maxVersionsInMemory);
			}
			proposedOldestVersion = std::min(proposedOldestVersion, data->version.get() - 1);
			proposedOldestVersion = std::max(proposedOldestVersion, data->oldestVersion.get());
			proposedOldestVersion = std::max(proposedOldestVersion, data->desiredOldestVersion.get());

			//TraceEvent("StorageServerUpdated", data->thisServerID).detail("Ver", ver).detail("DataVersion", data->version.get())
			//	.detail("LastTLogVersion", data->lastTLogVersion).detail("NewOldest",
			// data->oldestVersion.get()).detail("DesiredOldest",data->desiredOldestVersion.get())
			//	.detail("MaxVersionInMemory", maxVersionsInMemory).detail("Proposed",
			// proposedOldestVersion).detail("PrimaryLocality", data->primaryLocality).detail("Tag",
			// data->tag.toString());

			while (!data->recoveryVersionSkips.empty() &&
			       proposedOldestVersion > data->recoveryVersionSkips.front().first) {
				data->recoveryVersionSkips.pop_front();
			}
			data->desiredOldestVersion.set(proposedOldestVersion);
		}

		validate(data);

		data->logCursor->advanceTo(cloneCursor2->version());
		if (cursor->version().version >= data->lastTLogVersion) {
			if (data->behind) {
				TraceEvent("StorageServerNoLongerBehind", data->thisServerID)
				    .detail("CursorVersion", cursor->version().version)
				    .detail("TLogVersion", data->lastTLogVersion);
			}
			data->behind = false;
		}

		return Void(); // update will get called again ASAP
	} catch (Error& err) {
		state Error e = err;
		if (e.code() != error_code_worker_removed && e.code() != error_code_please_reboot) {
			TraceEvent(SevError, "SSUpdateError", data->thisServerID).error(e).backtrace();
		} else if (e.code() == error_code_please_reboot) {
			wait(data->durableInProgress);
		}
		throw e;
	}
}