in fdbserver/workloads/ConsistencyCheck.actor.cpp [1138:1715]
ACTOR Future<bool> checkDataConsistency(Database cx,
VectorRef<KeyValueRef> keyLocations,
DatabaseConfiguration configuration,
std::map<UID, StorageServerInterface> tssMapping,
ConsistencyCheckWorkload* self) {
// Stores the total number of bytes on each storage server
// In a distributed test, this will be an estimated size
state std::map<UID, int64_t> storageServerSizes;
// Iterate through each shard, checking its values on all of its storage servers
// If shardSampleFactor > 1, then not all shards are processed
// Also, in a distributed data consistency check, each client processes a subset of the shards
// Note: this may cause some shards to be processed more than once or not at all in a non-quiescent database
state int effectiveClientCount = (self->distributed) ? self->clientCount : 1;
state int i = self->clientId * (self->shardSampleFactor + 1);
state int increment =
(self->distributed && !self->firstClient) ? effectiveClientCount * self->shardSampleFactor : 1;
state int rateLimitForThisRound =
self->bytesReadInPreviousRound == 0
? self->rateLimitMax
: std::min(
self->rateLimitMax,
static_cast<int>(ceil(self->bytesReadInPreviousRound /
(float)CLIENT_KNOBS->CONSISTENCY_CHECK_ONE_ROUND_TARGET_COMPLETION_TIME)));
ASSERT(rateLimitForThisRound >= 0 && rateLimitForThisRound <= self->rateLimitMax);
TraceEvent("ConsistencyCheck_RateLimitForThisRound").detail("RateLimit", rateLimitForThisRound);
state Reference<IRateControl> rateLimiter = Reference<IRateControl>(new SpeedLimit(rateLimitForThisRound, 1));
state double rateLimiterStartTime = now();
state int64_t bytesReadInthisRound = 0;
state double dbSize = 100e12;
if (g_network->isSimulated()) {
// This call will get all shard ranges in the database, which is too expensive on real clusters.
int64_t _dbSize = wait(self->getDatabaseSize(cx));
dbSize = _dbSize;
}
state std::vector<KeyRangeRef> ranges;
for (int k = 0; k < keyLocations.size() - 1; k++) {
KeyRangeRef range(keyLocations[k].key, keyLocations[k + 1].key);
ranges.push_back(range);
}
state std::vector<int> shardOrder;
shardOrder.reserve(ranges.size());
for (int k = 0; k < ranges.size(); k++)
shardOrder.push_back(k);
if (self->shuffleShards) {
uint32_t seed = self->sharedRandomNumber + self->repetitions;
DeterministicRandom sharedRandom(seed == 0 ? 1 : seed);
sharedRandom.randomShuffle(shardOrder);
}
for (; i < ranges.size(); i += increment) {
state int shard = shardOrder[i];
state KeyRangeRef range = ranges[shard];
state std::vector<UID> sourceStorageServers;
state std::vector<UID> destStorageServers;
state Transaction tr(cx);
tr.setOption(FDBTransactionOptions::LOCK_AWARE);
state int bytesReadInRange = 0;
RangeResult UIDtoTagMap = wait(tr.getRange(serverTagKeys, CLIENT_KNOBS->TOO_MANY));
ASSERT(!UIDtoTagMap.more && UIDtoTagMap.size() < CLIENT_KNOBS->TOO_MANY);
decodeKeyServersValue(
UIDtoTagMap, keyLocations[shard].value, sourceStorageServers, destStorageServers, false);
// If the destStorageServers is non-empty, then this shard is being relocated
state bool isRelocating = destStorageServers.size() > 0;
// This check was disabled because we now disable data distribution during the consistency check,
// which can leave shards with dest storage servers.
// Disallow relocations in a quiescent database
/*if(self->firstClient && self->performQuiescentChecks && isRelocating)
{
TraceEvent("ConsistencyCheck_QuiescentShardRelocation").detail("ShardBegin", printable(range.start)).detail("ShardEnd", printable(range.end));
self->testFailure("Shard is being relocated in quiescent database");
return false;
}*/
// In a quiescent database, check that the team size is the same as the desired team size
if (self->firstClient && self->performQuiescentChecks &&
sourceStorageServers.size() != configuration.usableRegions * configuration.storageTeamSize) {
TraceEvent("ConsistencyCheck_InvalidTeamSize")
.detail("ShardBegin", printable(range.begin))
.detail("ShardEnd", printable(range.end))
.detail("SourceTeamSize", sourceStorageServers.size())
.detail("DestServerSize", destStorageServers.size())
.detail("ConfigStorageTeamSize", configuration.storageTeamSize)
.detail("UsableRegions", configuration.usableRegions);
// Record the server reponsible for the problematic shards
int i = 0;
for (auto& id : sourceStorageServers) {
TraceEvent("IncorrectSizeTeamInfo").detail("ServerUID", id).detail("TeamIndex", i++);
}
self->testFailure("Invalid team size");
return false;
}
state std::vector<UID> storageServers = (isRelocating) ? destStorageServers : sourceStorageServers;
state std::vector<StorageServerInterface> storageServerInterfaces;
//TraceEvent("ConsistencyCheck_GetStorageInfo").detail("StorageServers", storageServers.size());
loop {
try {
std::vector<Future<Optional<Value>>> serverListEntries;
serverListEntries.reserve(storageServers.size());
for (int s = 0; s < storageServers.size(); s++)
serverListEntries.push_back(tr.get(serverListKeyFor(storageServers[s])));
state std::vector<Optional<Value>> serverListValues = wait(getAll(serverListEntries));
for (int s = 0; s < serverListValues.size(); s++) {
if (serverListValues[s].present())
storageServerInterfaces.push_back(decodeServerListValue(serverListValues[s].get()));
else if (self->performQuiescentChecks)
self->testFailure("/FF/serverList changing in a quiescent database");
}
break;
} catch (Error& e) {
wait(tr.onError(e));
}
}
// add TSS to end of list, if configured and if not relocating
if (!isRelocating && self->performTSSCheck) {
int initialSize = storageServers.size();
for (int i = 0; i < initialSize; i++) {
auto tssPair = tssMapping.find(storageServers[i]);
if (tssPair != tssMapping.end()) {
TEST(true); // TSS checked in consistency check
storageServers.push_back(tssPair->second.id());
storageServerInterfaces.push_back(tssPair->second);
}
}
}
state std::vector<int64_t> estimatedBytes =
wait(self->getStorageSizeEstimate(storageServerInterfaces, range));
// Gets permitted size range of shard
int64_t maxShardSize = getMaxShardSize(dbSize);
state ShardSizeBounds shardBounds = getShardSizeBounds(range, maxShardSize);
if (self->firstClient) {
// If there was an error retrieving shard estimated size
if (self->performQuiescentChecks && estimatedBytes.size() == 0)
self->testFailure("Error fetching storage metrics");
// If running a distributed test, storage server size is an accumulation of shard estimates
else if (self->distributed && self->firstClient)
for (int j = 0; j < storageServers.size(); j++)
storageServerSizes[storageServers[j]] += std::max(estimatedBytes[j], (int64_t)0);
}
// The first client may need to skip the rest of the loop contents if it is just processing this shard to
// get a size estimate
if (!self->firstClient || shard % (effectiveClientCount * self->shardSampleFactor) == 0) {
state int shardKeys = 0;
state int shardBytes = 0;
state int sampledBytes = 0;
state int splitBytes = 0;
state int firstKeySampledBytes = 0;
state int sampledKeys = 0;
state int sampledKeysWithProb = 0;
state double shardVariance = 0;
state bool canSplit = false;
state Key lastSampleKey;
state Key lastStartSampleKey;
state int64_t totalReadAmount = 0;
state KeySelector begin = firstGreaterOrEqual(range.begin);
state Transaction onErrorTr(
cx); // This transaction exists only to access onError and its backoff behavior
// Read a limited number of entries at a time, repeating until all keys in the shard have been read
loop {
try {
lastSampleKey = lastStartSampleKey;
// Get the min version of the storage servers
Version version = wait(self->getVersion(cx, self));
state GetKeyValuesRequest req;
req.begin = begin;
req.end = firstGreaterOrEqual(range.end);
req.limit = 1e4;
req.limitBytes = CLIENT_KNOBS->REPLY_BYTE_LIMIT;
req.version = version;
req.tags = TagSet();
// Try getting the entries in the specified range
state std::vector<Future<ErrorOr<GetKeyValuesReply>>> keyValueFutures;
state int j = 0;
for (j = 0; j < storageServerInterfaces.size(); j++) {
resetReply(req);
keyValueFutures.push_back(
storageServerInterfaces[j].getKeyValues.getReplyUnlessFailedFor(req, 2, 0));
}
wait(waitForAll(keyValueFutures));
// Read the resulting entries
state int firstValidServer = -1;
totalReadAmount = 0;
for (j = 0; j < keyValueFutures.size(); j++) {
ErrorOr<GetKeyValuesReply> rangeResult = keyValueFutures[j].get();
// Compare the results with other storage servers
if (rangeResult.present() && !rangeResult.get().error.present()) {
state GetKeyValuesReply current = rangeResult.get();
totalReadAmount += current.data.expectedSize();
// If we haven't encountered a valid storage server yet, then mark this as the baseline
// to compare against
if (firstValidServer == -1)
firstValidServer = j;
// Compare this shard against the first
else {
GetKeyValuesReply reference = keyValueFutures[firstValidServer].get().get();
if (current.data != reference.data || current.more != reference.more) {
// Be especially verbose if in simulation
if (g_network->isSimulated()) {
int invalidIndex = -1;
printf("\n%sSERVER %d (%s); shard = %s - %s:\n",
storageServerInterfaces[j].isTss() ? "TSS " : "",
j,
storageServerInterfaces[j].address().toString().c_str(),
printable(req.begin.getKey()).c_str(),
printable(req.end.getKey()).c_str());
for (int k = 0; k < current.data.size(); k++) {
printf("%d. %s => %s\n",
k,
printable(current.data[k].key).c_str(),
printable(current.data[k].value).c_str());
if (invalidIndex < 0 &&
(k >= reference.data.size() ||
current.data[k].key != reference.data[k].key ||
current.data[k].value != reference.data[k].value))
invalidIndex = k;
}
printf(
"\n%sSERVER %d (%s); shard = %s - %s:\n",
storageServerInterfaces[firstValidServer].isTss() ? "TSS " : "",
firstValidServer,
storageServerInterfaces[firstValidServer].address().toString().c_str(),
printable(req.begin.getKey()).c_str(),
printable(req.end.getKey()).c_str());
for (int k = 0; k < reference.data.size(); k++) {
printf("%d. %s => %s\n",
k,
printable(reference.data[k].key).c_str(),
printable(reference.data[k].value).c_str());
if (invalidIndex < 0 &&
(k >= current.data.size() ||
reference.data[k].key != current.data[k].key ||
reference.data[k].value != current.data[k].value))
invalidIndex = k;
}
printf("\nMISMATCH AT %d\n\n", invalidIndex);
}
// Data for trace event
// The number of keys unique to the current shard
int currentUniques = 0;
// The number of keys unique to the reference shard
int referenceUniques = 0;
// The number of keys in both shards with conflicting values
int valueMismatches = 0;
// The number of keys in both shards with matching values
int matchingKVPairs = 0;
// Last unique key on the current shard
KeyRef currentUniqueKey;
// Last unique key on the reference shard
KeyRef referenceUniqueKey;
// Last value mismatch
KeyRef valueMismatchKey;
// Loop indeces
int currentI = 0;
int referenceI = 0;
while (currentI < current.data.size() || referenceI < reference.data.size()) {
if (currentI >= current.data.size()) {
referenceUniqueKey = reference.data[referenceI].key;
referenceUniques++;
referenceI++;
} else if (referenceI >= reference.data.size()) {
currentUniqueKey = current.data[currentI].key;
currentUniques++;
currentI++;
} else {
KeyValueRef currentKV = current.data[currentI];
KeyValueRef referenceKV = reference.data[referenceI];
if (currentKV.key == referenceKV.key) {
if (currentKV.value == referenceKV.value)
matchingKVPairs++;
else {
valueMismatchKey = currentKV.key;
valueMismatches++;
}
currentI++;
referenceI++;
} else if (currentKV.key < referenceKV.key) {
currentUniqueKey = currentKV.key;
currentUniques++;
currentI++;
} else {
referenceUniqueKey = referenceKV.key;
referenceUniques++;
referenceI++;
}
}
}
TraceEvent("ConsistencyCheck_DataInconsistent")
.detail(format("StorageServer%d", j).c_str(), storageServers[j].toString())
.detail(format("StorageServer%d", firstValidServer).c_str(),
storageServers[firstValidServer].toString())
.detail("ShardBegin", req.begin.getKey())
.detail("ShardEnd", req.end.getKey())
.detail("VersionNumber", req.version)
.detail(format("Server%dUniques", j).c_str(), currentUniques)
.detail(format("Server%dUniqueKey", j).c_str(), currentUniqueKey)
.detail(format("Server%dUniques", firstValidServer).c_str(),
referenceUniques)
.detail(format("Server%dUniqueKey", firstValidServer).c_str(),
referenceUniqueKey)
.detail("ValueMismatches", valueMismatches)
.detail("ValueMismatchKey", valueMismatchKey)
.detail("MatchingKVPairs", matchingKVPairs)
.detail("IsTSS",
storageServerInterfaces[j].isTss() ||
storageServerInterfaces[firstValidServer].isTss()
? "True"
: "False");
if ((g_network->isSimulated() &&
g_simulator.tssMode != ISimulator::TSSMode::EnabledDropMutations) ||
(!storageServerInterfaces[j].isTss() &&
!storageServerInterfaces[firstValidServer].isTss())) {
self->testFailure("Data inconsistent", true);
return false;
}
}
}
}
// If the data is not available and we aren't relocating this shard
else if (!isRelocating) {
Error e =
rangeResult.isError() ? rangeResult.getError() : rangeResult.get().error.get();
TraceEvent("ConsistencyCheck_StorageServerUnavailable")
.suppressFor(1.0)
.detail("StorageServer", storageServers[j])
.detail("ShardBegin", printable(range.begin))
.detail("ShardEnd", printable(range.end))
.detail("Address", storageServerInterfaces[j].address())
.detail("UID", storageServerInterfaces[j].id())
.detail("GetKeyValuesToken",
storageServerInterfaces[j].getKeyValues.getEndpoint().token)
.detail("IsTSS", storageServerInterfaces[j].isTss() ? "True" : "False")
.error(e);
// All shards should be available in quiscence
if (self->performQuiescentChecks && !storageServerInterfaces[j].isTss()) {
self->testFailure("Storage server unavailable");
return false;
}
}
}
if (firstValidServer >= 0) {
VectorRef<KeyValueRef> data = keyValueFutures[firstValidServer].get().get().data;
// Calculate the size of the shard, the variance of the shard size estimate, and the correct
// shard size estimate
for (int k = 0; k < data.size(); k++) {
ByteSampleInfo sampleInfo = isKeyValueInSample(data[k]);
shardBytes += sampleInfo.size;
double itemProbability = ((double)sampleInfo.size) / sampleInfo.sampledSize;
if (itemProbability < 1)
shardVariance += itemProbability * (1 - itemProbability) *
pow((double)sampleInfo.sampledSize, 2);
if (sampleInfo.inSample) {
sampledBytes += sampleInfo.sampledSize;
if (!canSplit && sampledBytes >= shardBounds.min.bytes &&
data[k].key.size() <= CLIENT_KNOBS->SPLIT_KEY_SIZE_LIMIT &&
sampledBytes <= shardBounds.max.bytes *
CLIENT_KNOBS->STORAGE_METRICS_UNFAIR_SPLIT_LIMIT / 2) {
canSplit = true;
splitBytes = sampledBytes;
}
/*TraceEvent("ConsistencyCheck_ByteSample").detail("ShardBegin", printable(range.begin)).detail("ShardEnd", printable(range.end))
.detail("SampledBytes", sampleInfo.sampledSize).detail("Key",
printable(data[k].key)).detail("KeySize", data[k].key.size()).detail("ValueSize",
data[k].value.size());*/
// In data distribution, the splitting process ignores the first key in a shard.
// Thus, we shouldn't consider it when validating the upper bound of estimated shard
// sizes
if (k == 0)
firstKeySampledBytes += sampleInfo.sampledSize;
sampledKeys++;
if (itemProbability < 1) {
sampledKeysWithProb++;
}
}
}
// Accumulate number of keys in this shard
shardKeys += data.size();
}
// after requesting each shard, enforce rate limit based on how much data will likely be read
if (rateLimitForThisRound > 0) {
wait(rateLimiter->getAllowance(totalReadAmount));
// Set ratelimit to max allowed if current round has been going on for a while
if (now() - rateLimiterStartTime >
1.1 * CLIENT_KNOBS->CONSISTENCY_CHECK_ONE_ROUND_TARGET_COMPLETION_TIME &&
rateLimitForThisRound != self->rateLimitMax) {
rateLimitForThisRound = self->rateLimitMax;
rateLimiter = Reference<IRateControl>(new SpeedLimit(rateLimitForThisRound, 1));
rateLimiterStartTime = now();
TraceEvent(SevInfo, "ConsistencyCheck_RateLimitSetMaxForThisRound")
.detail("RateLimit", rateLimitForThisRound);
}
}
bytesReadInRange += totalReadAmount;
bytesReadInthisRound += totalReadAmount;
// Advance to the next set of entries
if (firstValidServer >= 0 && keyValueFutures[firstValidServer].get().get().more) {
VectorRef<KeyValueRef> result = keyValueFutures[firstValidServer].get().get().data;
ASSERT(result.size() > 0);
begin = firstGreaterThan(result[result.size() - 1].key);
ASSERT(begin.getKey() != allKeys.end);
lastStartSampleKey = lastSampleKey;
} else
break;
} catch (Error& e) {
state Error err = e;
wait(onErrorTr.onError(err));
TraceEvent("ConsistencyCheck_RetryDataConsistency").error(err);
}
}
canSplit = canSplit && sampledBytes - splitBytes >= shardBounds.min.bytes && sampledBytes > splitBytes;
// Update the size of all storage servers containing this shard
// This is only done in a non-distributed consistency check; the distributed check uses shard size
// estimates
if (!self->distributed)
for (int j = 0; j < storageServers.size(); j++)
storageServerSizes[storageServers[j]] += shardBytes;
// FIXME: Where is this intended to be used?
[[maybe_unused]] bool hasValidEstimate = estimatedBytes.size() > 0;
// If the storage servers' sampled estimate of shard size is different from ours
if (self->performQuiescentChecks) {
for (int j = 0; j < estimatedBytes.size(); j++) {
if (estimatedBytes[j] >= 0 && estimatedBytes[j] != sampledBytes) {
TraceEvent("ConsistencyCheck_IncorrectEstimate")
.detail("EstimatedBytes", estimatedBytes[j])
.detail("CorrectSampledBytes", sampledBytes)
.detail("StorageServer", storageServers[j])
.detail("IsTSS", storageServerInterfaces[j].isTss() ? "True" : "False");
if (!storageServerInterfaces[j].isTss()) {
self->testFailure("Storage servers had incorrect sampled estimate");
}
hasValidEstimate = false;
break;
} else if (estimatedBytes[j] < 0 &&
(g_network->isSimulated() || !storageServerInterfaces[j].isTss())) {
self->testFailure("Could not get storage metrics from server");
hasValidEstimate = false;
break;
}
}
}
// Compute the difference between the shard size estimate and its actual size. If it is sufficiently
// large, then fail
double stdDev = sqrt(shardVariance);
double failErrorNumStdDev = 7;
int estimateError = abs(shardBytes - sampledBytes);
// Only perform the check if there are sufficient keys to get a distribution that should resemble a
// normal distribution
if (sampledKeysWithProb > 30 && estimateError > failErrorNumStdDev * stdDev) {
double numStdDev = estimateError / sqrt(shardVariance);
TraceEvent("ConsistencyCheck_InaccurateShardEstimate")
.detail("Min", shardBounds.min.bytes)
.detail("Max", shardBounds.max.bytes)
.detail("Estimate", sampledBytes)
.detail("Actual", shardBytes)
.detail("NumStdDev", numStdDev)
.detail("Variance", shardVariance)
.detail("StdDev", stdDev)
.detail("ShardBegin", printable(range.begin))
.detail("ShardEnd", printable(range.end))
.detail("NumKeys", shardKeys)
.detail("NumSampledKeys", sampledKeys)
.detail("NumSampledKeysWithProb", sampledKeysWithProb);
self->testFailure(format("Shard size is more than %f std dev from estimate", failErrorNumStdDev));
}
// In a quiescent database, check that the (estimated) size of the shard is within permitted bounds
// Min and max shard sizes have a 3 * shardBounds.permittedError.bytes cushion for error since shard
// sizes are not precise Shard splits ignore the first key in a shard, so its size shouldn't be
// considered when checking the upper bound 0xff shards are not checked
if (canSplit && sampledKeys > 5 && self->performQuiescentChecks &&
!range.begin.startsWith(keyServersPrefix) &&
(sampledBytes < shardBounds.min.bytes - 3 * shardBounds.permittedError.bytes ||
sampledBytes - firstKeySampledBytes >
shardBounds.max.bytes + 3 * shardBounds.permittedError.bytes)) {
TraceEvent("ConsistencyCheck_InvalidShardSize")
.detail("Min", shardBounds.min.bytes)
.detail("Max", shardBounds.max.bytes)
.detail("Size", shardBytes)
.detail("EstimatedSize", sampledBytes)
.detail("ShardBegin", printable(range.begin))
.detail("ShardEnd", printable(range.end))
.detail("ShardCount", ranges.size())
.detail("SampledKeys", sampledKeys);
self->testFailure(format("Shard size in quiescent database is too %s",
(sampledBytes < shardBounds.min.bytes) ? "small" : "large"));
return false;
}
}
if (bytesReadInRange > 0) {
TraceEvent("ConsistencyCheck_ReadRange")
.suppressFor(1.0)
.detail("Range", range)
.detail("BytesRead", bytesReadInRange);
}
}
// SOMEDAY: when background data distribution is implemented, include this test
// In a quiescent database, check that the sizes of storage servers are roughly the same
/*if(self->performQuiescentChecks)
{
auto minStorageServer = std::min_element(storageServerSizes.begin(), storageServerSizes.end(),
ConsistencyCheckWorkload::compareByValue<UID, int64_t>); auto maxStorageServer =
std::max_element(storageServerSizes.begin(), storageServerSizes.end(),
ConsistencyCheckWorkload::compareByValue<UID, int64_t>);
int bias = SERVER_KNOBS->MIN_SHARD_BYTES;
if(1.1 * (minStorageServer->second + SERVER_KNOBS->MIN_SHARD_BYTES) < maxStorageServer->second +
SERVER_KNOBS->MIN_SHARD_BYTES)
{
TraceEvent("ConsistencyCheck_InconsistentStorageServerSizes").detail("MinSize", minStorageServer->second).detail("MaxSize", maxStorageServer->second)
.detail("MinStorageServer", minStorageServer->first).detail("MaxStorageServer",
maxStorageServer->first);
self->testFailure(format("Storage servers differ significantly in size by a factor of %f",
((double)maxStorageServer->second) / minStorageServer->second)); return false;
}
}*/
self->bytesReadInPreviousRound = bytesReadInthisRound;
return true;
}