ACTOR Future checkDataConsistency()

in fdbserver/workloads/ConsistencyCheck.actor.cpp [1138:1715]


	ACTOR Future<bool> checkDataConsistency(Database cx,
	                                        VectorRef<KeyValueRef> keyLocations,
	                                        DatabaseConfiguration configuration,
	                                        std::map<UID, StorageServerInterface> tssMapping,
	                                        ConsistencyCheckWorkload* self) {
		// Stores the total number of bytes on each storage server
		// In a distributed test, this will be an estimated size
		state std::map<UID, int64_t> storageServerSizes;

		// Iterate through each shard, checking its values on all of its storage servers
		// If shardSampleFactor > 1, then not all shards are processed
		// Also, in a distributed data consistency check, each client processes a subset of the shards
		// Note: this may cause some shards to be processed more than once or not at all in a non-quiescent database
		state int effectiveClientCount = (self->distributed) ? self->clientCount : 1;
		state int i = self->clientId * (self->shardSampleFactor + 1);
		state int increment =
		    (self->distributed && !self->firstClient) ? effectiveClientCount * self->shardSampleFactor : 1;
		state int rateLimitForThisRound =
		    self->bytesReadInPreviousRound == 0
		        ? self->rateLimitMax
		        : std::min(
		              self->rateLimitMax,
		              static_cast<int>(ceil(self->bytesReadInPreviousRound /
		                                    (float)CLIENT_KNOBS->CONSISTENCY_CHECK_ONE_ROUND_TARGET_COMPLETION_TIME)));
		ASSERT(rateLimitForThisRound >= 0 && rateLimitForThisRound <= self->rateLimitMax);
		TraceEvent("ConsistencyCheck_RateLimitForThisRound").detail("RateLimit", rateLimitForThisRound);
		state Reference<IRateControl> rateLimiter = Reference<IRateControl>(new SpeedLimit(rateLimitForThisRound, 1));
		state double rateLimiterStartTime = now();
		state int64_t bytesReadInthisRound = 0;

		state double dbSize = 100e12;
		if (g_network->isSimulated()) {
			// This call will get all shard ranges in the database, which is too expensive on real clusters.
			int64_t _dbSize = wait(self->getDatabaseSize(cx));
			dbSize = _dbSize;
		}

		state std::vector<KeyRangeRef> ranges;

		for (int k = 0; k < keyLocations.size() - 1; k++) {
			KeyRangeRef range(keyLocations[k].key, keyLocations[k + 1].key);
			ranges.push_back(range);
		}

		state std::vector<int> shardOrder;
		shardOrder.reserve(ranges.size());
		for (int k = 0; k < ranges.size(); k++)
			shardOrder.push_back(k);
		if (self->shuffleShards) {
			uint32_t seed = self->sharedRandomNumber + self->repetitions;
			DeterministicRandom sharedRandom(seed == 0 ? 1 : seed);
			sharedRandom.randomShuffle(shardOrder);
		}

		for (; i < ranges.size(); i += increment) {
			state int shard = shardOrder[i];

			state KeyRangeRef range = ranges[shard];
			state std::vector<UID> sourceStorageServers;
			state std::vector<UID> destStorageServers;
			state Transaction tr(cx);
			tr.setOption(FDBTransactionOptions::LOCK_AWARE);
			state int bytesReadInRange = 0;

			RangeResult UIDtoTagMap = wait(tr.getRange(serverTagKeys, CLIENT_KNOBS->TOO_MANY));
			ASSERT(!UIDtoTagMap.more && UIDtoTagMap.size() < CLIENT_KNOBS->TOO_MANY);
			decodeKeyServersValue(
			    UIDtoTagMap, keyLocations[shard].value, sourceStorageServers, destStorageServers, false);

			// If the destStorageServers is non-empty, then this shard is being relocated
			state bool isRelocating = destStorageServers.size() > 0;

			// This check was disabled because we now disable data distribution during the consistency check,
			// which can leave shards with dest storage servers.

			// Disallow relocations in a quiescent database
			/*if(self->firstClient && self->performQuiescentChecks && isRelocating)
			{
			    TraceEvent("ConsistencyCheck_QuiescentShardRelocation").detail("ShardBegin", printable(range.start)).detail("ShardEnd", printable(range.end));
			    self->testFailure("Shard is being relocated in quiescent database");
			    return false;
			}*/

			// In a quiescent database, check that the team size is the same as the desired team size
			if (self->firstClient && self->performQuiescentChecks &&
			    sourceStorageServers.size() != configuration.usableRegions * configuration.storageTeamSize) {
				TraceEvent("ConsistencyCheck_InvalidTeamSize")
				    .detail("ShardBegin", printable(range.begin))
				    .detail("ShardEnd", printable(range.end))
				    .detail("SourceTeamSize", sourceStorageServers.size())
				    .detail("DestServerSize", destStorageServers.size())
				    .detail("ConfigStorageTeamSize", configuration.storageTeamSize)
				    .detail("UsableRegions", configuration.usableRegions);
				// Record the server reponsible for the problematic shards
				int i = 0;
				for (auto& id : sourceStorageServers) {
					TraceEvent("IncorrectSizeTeamInfo").detail("ServerUID", id).detail("TeamIndex", i++);
				}
				self->testFailure("Invalid team size");
				return false;
			}

			state std::vector<UID> storageServers = (isRelocating) ? destStorageServers : sourceStorageServers;
			state std::vector<StorageServerInterface> storageServerInterfaces;

			//TraceEvent("ConsistencyCheck_GetStorageInfo").detail("StorageServers", storageServers.size());
			loop {
				try {
					std::vector<Future<Optional<Value>>> serverListEntries;
					serverListEntries.reserve(storageServers.size());
					for (int s = 0; s < storageServers.size(); s++)
						serverListEntries.push_back(tr.get(serverListKeyFor(storageServers[s])));
					state std::vector<Optional<Value>> serverListValues = wait(getAll(serverListEntries));
					for (int s = 0; s < serverListValues.size(); s++) {
						if (serverListValues[s].present())
							storageServerInterfaces.push_back(decodeServerListValue(serverListValues[s].get()));
						else if (self->performQuiescentChecks)
							self->testFailure("/FF/serverList changing in a quiescent database");
					}

					break;
				} catch (Error& e) {
					wait(tr.onError(e));
				}
			}

			// add TSS to end of list, if configured and if not relocating
			if (!isRelocating && self->performTSSCheck) {
				int initialSize = storageServers.size();
				for (int i = 0; i < initialSize; i++) {
					auto tssPair = tssMapping.find(storageServers[i]);
					if (tssPair != tssMapping.end()) {
						TEST(true); // TSS checked in consistency check
						storageServers.push_back(tssPair->second.id());
						storageServerInterfaces.push_back(tssPair->second);
					}
				}
			}

			state std::vector<int64_t> estimatedBytes =
			    wait(self->getStorageSizeEstimate(storageServerInterfaces, range));

			// Gets permitted size range of shard
			int64_t maxShardSize = getMaxShardSize(dbSize);
			state ShardSizeBounds shardBounds = getShardSizeBounds(range, maxShardSize);

			if (self->firstClient) {
				// If there was an error retrieving shard estimated size
				if (self->performQuiescentChecks && estimatedBytes.size() == 0)
					self->testFailure("Error fetching storage metrics");

				// If running a distributed test, storage server size is an accumulation of shard estimates
				else if (self->distributed && self->firstClient)
					for (int j = 0; j < storageServers.size(); j++)
						storageServerSizes[storageServers[j]] += std::max(estimatedBytes[j], (int64_t)0);
			}

			// The first client may need to skip the rest of the loop contents if it is just processing this shard to
			// get a size estimate
			if (!self->firstClient || shard % (effectiveClientCount * self->shardSampleFactor) == 0) {
				state int shardKeys = 0;
				state int shardBytes = 0;
				state int sampledBytes = 0;
				state int splitBytes = 0;
				state int firstKeySampledBytes = 0;
				state int sampledKeys = 0;
				state int sampledKeysWithProb = 0;
				state double shardVariance = 0;
				state bool canSplit = false;
				state Key lastSampleKey;
				state Key lastStartSampleKey;
				state int64_t totalReadAmount = 0;

				state KeySelector begin = firstGreaterOrEqual(range.begin);
				state Transaction onErrorTr(
				    cx); // This transaction exists only to access onError and its backoff behavior

				// Read a limited number of entries at a time, repeating until all keys in the shard have been read
				loop {
					try {
						lastSampleKey = lastStartSampleKey;

						// Get the min version of the storage servers
						Version version = wait(self->getVersion(cx, self));

						state GetKeyValuesRequest req;
						req.begin = begin;
						req.end = firstGreaterOrEqual(range.end);
						req.limit = 1e4;
						req.limitBytes = CLIENT_KNOBS->REPLY_BYTE_LIMIT;
						req.version = version;
						req.tags = TagSet();

						// Try getting the entries in the specified range
						state std::vector<Future<ErrorOr<GetKeyValuesReply>>> keyValueFutures;
						state int j = 0;
						for (j = 0; j < storageServerInterfaces.size(); j++) {
							resetReply(req);
							keyValueFutures.push_back(
							    storageServerInterfaces[j].getKeyValues.getReplyUnlessFailedFor(req, 2, 0));
						}

						wait(waitForAll(keyValueFutures));

						// Read the resulting entries
						state int firstValidServer = -1;
						totalReadAmount = 0;
						for (j = 0; j < keyValueFutures.size(); j++) {
							ErrorOr<GetKeyValuesReply> rangeResult = keyValueFutures[j].get();

							// Compare the results with other storage servers
							if (rangeResult.present() && !rangeResult.get().error.present()) {
								state GetKeyValuesReply current = rangeResult.get();
								totalReadAmount += current.data.expectedSize();
								// If we haven't encountered a valid storage server yet, then mark this as the baseline
								// to compare against
								if (firstValidServer == -1)
									firstValidServer = j;

								// Compare this shard against the first
								else {
									GetKeyValuesReply reference = keyValueFutures[firstValidServer].get().get();

									if (current.data != reference.data || current.more != reference.more) {
										// Be especially verbose if in simulation
										if (g_network->isSimulated()) {
											int invalidIndex = -1;
											printf("\n%sSERVER %d (%s); shard = %s - %s:\n",
											       storageServerInterfaces[j].isTss() ? "TSS " : "",
											       j,
											       storageServerInterfaces[j].address().toString().c_str(),
											       printable(req.begin.getKey()).c_str(),
											       printable(req.end.getKey()).c_str());
											for (int k = 0; k < current.data.size(); k++) {
												printf("%d. %s => %s\n",
												       k,
												       printable(current.data[k].key).c_str(),
												       printable(current.data[k].value).c_str());
												if (invalidIndex < 0 &&
												    (k >= reference.data.size() ||
												     current.data[k].key != reference.data[k].key ||
												     current.data[k].value != reference.data[k].value))
													invalidIndex = k;
											}

											printf(
											    "\n%sSERVER %d (%s); shard = %s - %s:\n",
											    storageServerInterfaces[firstValidServer].isTss() ? "TSS " : "",
											    firstValidServer,
											    storageServerInterfaces[firstValidServer].address().toString().c_str(),
											    printable(req.begin.getKey()).c_str(),
											    printable(req.end.getKey()).c_str());
											for (int k = 0; k < reference.data.size(); k++) {
												printf("%d. %s => %s\n",
												       k,
												       printable(reference.data[k].key).c_str(),
												       printable(reference.data[k].value).c_str());
												if (invalidIndex < 0 &&
												    (k >= current.data.size() ||
												     reference.data[k].key != current.data[k].key ||
												     reference.data[k].value != current.data[k].value))
													invalidIndex = k;
											}

											printf("\nMISMATCH AT %d\n\n", invalidIndex);
										}

										// Data for trace event
										// The number of keys unique to the current shard
										int currentUniques = 0;
										// The number of keys unique to the reference shard
										int referenceUniques = 0;
										// The number of keys in both shards with conflicting values
										int valueMismatches = 0;
										// The number of keys in both shards with matching values
										int matchingKVPairs = 0;
										// Last unique key on the current shard
										KeyRef currentUniqueKey;
										// Last unique key on the reference shard
										KeyRef referenceUniqueKey;
										// Last value mismatch
										KeyRef valueMismatchKey;

										// Loop indeces
										int currentI = 0;
										int referenceI = 0;
										while (currentI < current.data.size() || referenceI < reference.data.size()) {
											if (currentI >= current.data.size()) {
												referenceUniqueKey = reference.data[referenceI].key;
												referenceUniques++;
												referenceI++;
											} else if (referenceI >= reference.data.size()) {
												currentUniqueKey = current.data[currentI].key;
												currentUniques++;
												currentI++;
											} else {
												KeyValueRef currentKV = current.data[currentI];
												KeyValueRef referenceKV = reference.data[referenceI];

												if (currentKV.key == referenceKV.key) {
													if (currentKV.value == referenceKV.value)
														matchingKVPairs++;
													else {
														valueMismatchKey = currentKV.key;
														valueMismatches++;
													}

													currentI++;
													referenceI++;
												} else if (currentKV.key < referenceKV.key) {
													currentUniqueKey = currentKV.key;
													currentUniques++;
													currentI++;
												} else {
													referenceUniqueKey = referenceKV.key;
													referenceUniques++;
													referenceI++;
												}
											}
										}

										TraceEvent("ConsistencyCheck_DataInconsistent")
										    .detail(format("StorageServer%d", j).c_str(), storageServers[j].toString())
										    .detail(format("StorageServer%d", firstValidServer).c_str(),
										            storageServers[firstValidServer].toString())
										    .detail("ShardBegin", req.begin.getKey())
										    .detail("ShardEnd", req.end.getKey())
										    .detail("VersionNumber", req.version)
										    .detail(format("Server%dUniques", j).c_str(), currentUniques)
										    .detail(format("Server%dUniqueKey", j).c_str(), currentUniqueKey)
										    .detail(format("Server%dUniques", firstValidServer).c_str(),
										            referenceUniques)
										    .detail(format("Server%dUniqueKey", firstValidServer).c_str(),
										            referenceUniqueKey)
										    .detail("ValueMismatches", valueMismatches)
										    .detail("ValueMismatchKey", valueMismatchKey)
										    .detail("MatchingKVPairs", matchingKVPairs)
										    .detail("IsTSS",
										            storageServerInterfaces[j].isTss() ||
										                    storageServerInterfaces[firstValidServer].isTss()
										                ? "True"
										                : "False");

										if ((g_network->isSimulated() &&
										     g_simulator.tssMode != ISimulator::TSSMode::EnabledDropMutations) ||
										    (!storageServerInterfaces[j].isTss() &&
										     !storageServerInterfaces[firstValidServer].isTss())) {
											self->testFailure("Data inconsistent", true);
											return false;
										}
									}
								}
							}

							// If the data is not available and we aren't relocating this shard
							else if (!isRelocating) {
								Error e =
								    rangeResult.isError() ? rangeResult.getError() : rangeResult.get().error.get();

								TraceEvent("ConsistencyCheck_StorageServerUnavailable")
								    .suppressFor(1.0)
								    .detail("StorageServer", storageServers[j])
								    .detail("ShardBegin", printable(range.begin))
								    .detail("ShardEnd", printable(range.end))
								    .detail("Address", storageServerInterfaces[j].address())
								    .detail("UID", storageServerInterfaces[j].id())
								    .detail("GetKeyValuesToken",
								            storageServerInterfaces[j].getKeyValues.getEndpoint().token)
								    .detail("IsTSS", storageServerInterfaces[j].isTss() ? "True" : "False")
								    .error(e);

								// All shards should be available in quiscence
								if (self->performQuiescentChecks && !storageServerInterfaces[j].isTss()) {
									self->testFailure("Storage server unavailable");
									return false;
								}
							}
						}

						if (firstValidServer >= 0) {
							VectorRef<KeyValueRef> data = keyValueFutures[firstValidServer].get().get().data;
							// Calculate the size of the shard, the variance of the shard size estimate, and the correct
							// shard size estimate
							for (int k = 0; k < data.size(); k++) {
								ByteSampleInfo sampleInfo = isKeyValueInSample(data[k]);
								shardBytes += sampleInfo.size;
								double itemProbability = ((double)sampleInfo.size) / sampleInfo.sampledSize;
								if (itemProbability < 1)
									shardVariance += itemProbability * (1 - itemProbability) *
									                 pow((double)sampleInfo.sampledSize, 2);

								if (sampleInfo.inSample) {
									sampledBytes += sampleInfo.sampledSize;
									if (!canSplit && sampledBytes >= shardBounds.min.bytes &&
									    data[k].key.size() <= CLIENT_KNOBS->SPLIT_KEY_SIZE_LIMIT &&
									    sampledBytes <= shardBounds.max.bytes *
									                        CLIENT_KNOBS->STORAGE_METRICS_UNFAIR_SPLIT_LIMIT / 2) {
										canSplit = true;
										splitBytes = sampledBytes;
									}

									/*TraceEvent("ConsistencyCheck_ByteSample").detail("ShardBegin", printable(range.begin)).detail("ShardEnd", printable(range.end))
									    .detail("SampledBytes", sampleInfo.sampledSize).detail("Key",
									   printable(data[k].key)).detail("KeySize", data[k].key.size()).detail("ValueSize",
									   data[k].value.size());*/

									// In data distribution, the splitting process ignores the first key in a shard.
									// Thus, we shouldn't consider it when validating the upper bound of estimated shard
									// sizes
									if (k == 0)
										firstKeySampledBytes += sampleInfo.sampledSize;

									sampledKeys++;
									if (itemProbability < 1) {
										sampledKeysWithProb++;
									}
								}
							}

							// Accumulate number of keys in this shard
							shardKeys += data.size();
						}
						// after requesting each shard, enforce rate limit based on how much data will likely be read
						if (rateLimitForThisRound > 0) {
							wait(rateLimiter->getAllowance(totalReadAmount));
							// Set ratelimit to max allowed if current round has been going on for a while
							if (now() - rateLimiterStartTime >
							        1.1 * CLIENT_KNOBS->CONSISTENCY_CHECK_ONE_ROUND_TARGET_COMPLETION_TIME &&
							    rateLimitForThisRound != self->rateLimitMax) {
								rateLimitForThisRound = self->rateLimitMax;
								rateLimiter = Reference<IRateControl>(new SpeedLimit(rateLimitForThisRound, 1));
								rateLimiterStartTime = now();
								TraceEvent(SevInfo, "ConsistencyCheck_RateLimitSetMaxForThisRound")
								    .detail("RateLimit", rateLimitForThisRound);
							}
						}
						bytesReadInRange += totalReadAmount;
						bytesReadInthisRound += totalReadAmount;

						// Advance to the next set of entries
						if (firstValidServer >= 0 && keyValueFutures[firstValidServer].get().get().more) {
							VectorRef<KeyValueRef> result = keyValueFutures[firstValidServer].get().get().data;
							ASSERT(result.size() > 0);
							begin = firstGreaterThan(result[result.size() - 1].key);
							ASSERT(begin.getKey() != allKeys.end);
							lastStartSampleKey = lastSampleKey;
						} else
							break;
					} catch (Error& e) {
						state Error err = e;
						wait(onErrorTr.onError(err));
						TraceEvent("ConsistencyCheck_RetryDataConsistency").error(err);
					}
				}

				canSplit = canSplit && sampledBytes - splitBytes >= shardBounds.min.bytes && sampledBytes > splitBytes;

				// Update the size of all storage servers containing this shard
				// This is only done in a non-distributed consistency check; the distributed check uses shard size
				// estimates
				if (!self->distributed)
					for (int j = 0; j < storageServers.size(); j++)
						storageServerSizes[storageServers[j]] += shardBytes;

				// FIXME: Where is this intended to be used?
				[[maybe_unused]] bool hasValidEstimate = estimatedBytes.size() > 0;

				// If the storage servers' sampled estimate of shard size is different from ours
				if (self->performQuiescentChecks) {
					for (int j = 0; j < estimatedBytes.size(); j++) {
						if (estimatedBytes[j] >= 0 && estimatedBytes[j] != sampledBytes) {
							TraceEvent("ConsistencyCheck_IncorrectEstimate")
							    .detail("EstimatedBytes", estimatedBytes[j])
							    .detail("CorrectSampledBytes", sampledBytes)
							    .detail("StorageServer", storageServers[j])
							    .detail("IsTSS", storageServerInterfaces[j].isTss() ? "True" : "False");

							if (!storageServerInterfaces[j].isTss()) {
								self->testFailure("Storage servers had incorrect sampled estimate");
							}

							hasValidEstimate = false;

							break;
						} else if (estimatedBytes[j] < 0 &&
						           (g_network->isSimulated() || !storageServerInterfaces[j].isTss())) {
							self->testFailure("Could not get storage metrics from server");
							hasValidEstimate = false;
							break;
						}
					}
				}

				// Compute the difference between the shard size estimate and its actual size.  If it is sufficiently
				// large, then fail
				double stdDev = sqrt(shardVariance);

				double failErrorNumStdDev = 7;
				int estimateError = abs(shardBytes - sampledBytes);

				// Only perform the check if there are sufficient keys to get a distribution that should resemble a
				// normal distribution
				if (sampledKeysWithProb > 30 && estimateError > failErrorNumStdDev * stdDev) {
					double numStdDev = estimateError / sqrt(shardVariance);
					TraceEvent("ConsistencyCheck_InaccurateShardEstimate")
					    .detail("Min", shardBounds.min.bytes)
					    .detail("Max", shardBounds.max.bytes)
					    .detail("Estimate", sampledBytes)
					    .detail("Actual", shardBytes)
					    .detail("NumStdDev", numStdDev)
					    .detail("Variance", shardVariance)
					    .detail("StdDev", stdDev)
					    .detail("ShardBegin", printable(range.begin))
					    .detail("ShardEnd", printable(range.end))
					    .detail("NumKeys", shardKeys)
					    .detail("NumSampledKeys", sampledKeys)
					    .detail("NumSampledKeysWithProb", sampledKeysWithProb);

					self->testFailure(format("Shard size is more than %f std dev from estimate", failErrorNumStdDev));
				}

				// In a quiescent database, check that the (estimated) size of the shard is within permitted bounds
				// Min and max shard sizes have a 3 * shardBounds.permittedError.bytes cushion for error since shard
				// sizes are not precise Shard splits ignore the first key in a shard, so its size shouldn't be
				// considered when checking the upper bound 0xff shards are not checked
				if (canSplit && sampledKeys > 5 && self->performQuiescentChecks &&
				    !range.begin.startsWith(keyServersPrefix) &&
				    (sampledBytes < shardBounds.min.bytes - 3 * shardBounds.permittedError.bytes ||
				     sampledBytes - firstKeySampledBytes >
				         shardBounds.max.bytes + 3 * shardBounds.permittedError.bytes)) {
					TraceEvent("ConsistencyCheck_InvalidShardSize")
					    .detail("Min", shardBounds.min.bytes)
					    .detail("Max", shardBounds.max.bytes)
					    .detail("Size", shardBytes)
					    .detail("EstimatedSize", sampledBytes)
					    .detail("ShardBegin", printable(range.begin))
					    .detail("ShardEnd", printable(range.end))
					    .detail("ShardCount", ranges.size())
					    .detail("SampledKeys", sampledKeys);
					self->testFailure(format("Shard size in quiescent database is too %s",
					                         (sampledBytes < shardBounds.min.bytes) ? "small" : "large"));
					return false;
				}
			}

			if (bytesReadInRange > 0) {
				TraceEvent("ConsistencyCheck_ReadRange")
				    .suppressFor(1.0)
				    .detail("Range", range)
				    .detail("BytesRead", bytesReadInRange);
			}
		}

		// SOMEDAY: when background data distribution is implemented, include this test
		// In a quiescent database, check that the sizes of storage servers are roughly the same
		/*if(self->performQuiescentChecks)
		{
		    auto minStorageServer = std::min_element(storageServerSizes.begin(), storageServerSizes.end(),
		ConsistencyCheckWorkload::compareByValue<UID, int64_t>); auto maxStorageServer =
		std::max_element(storageServerSizes.begin(), storageServerSizes.end(),
		ConsistencyCheckWorkload::compareByValue<UID, int64_t>);

		    int bias = SERVER_KNOBS->MIN_SHARD_BYTES;
		    if(1.1 * (minStorageServer->second + SERVER_KNOBS->MIN_SHARD_BYTES) < maxStorageServer->second +
		SERVER_KNOBS->MIN_SHARD_BYTES)
		    {
		        TraceEvent("ConsistencyCheck_InconsistentStorageServerSizes").detail("MinSize", minStorageServer->second).detail("MaxSize", maxStorageServer->second)
		            .detail("MinStorageServer", minStorageServer->first).detail("MaxStorageServer",
		maxStorageServer->first);

		        self->testFailure(format("Storage servers differ significantly in size by a factor of %f",
		((double)maxStorageServer->second) / minStorageServer->second)); return false;
		    }
		}*/

		self->bytesReadInPreviousRound = bytesReadInthisRound;
		return true;
	}