in cachelib/cachebench/runner/CacheStressor.h [243:416]
void stressByDiscreteDistribution(ThroughputStats& stats) {
std::mt19937_64 gen(folly::Random::rand64());
std::discrete_distribution<> opPoolDist(config_.opPoolDistribution.begin(),
config_.opPoolDistribution.end());
const uint64_t opDelayBatch = config_.opDelayBatch;
const uint64_t opDelayNs = config_.opDelayNs;
const std::chrono::nanoseconds opDelay(opDelayNs);
const bool needDelay = opDelayBatch != 0 && opDelayNs != 0;
uint64_t opCounter = 0;
auto throttleFn = [&] {
if (needDelay && ++opCounter == opDelayBatch) {
opCounter = 0;
std::this_thread::sleep_for(opDelay);
}
// Limit the rate if specified.
limitRate();
};
std::optional<uint64_t> lastRequestId = std::nullopt;
for (uint64_t i = 0;
i < config_.numOps &&
cache_->getInconsistencyCount() < config_.maxInconsistencyCount &&
cache_->getInvalidDestructorCount() <
config_.maxInvalidDestructorCount &&
!cache_->isNvmCacheDisabled() && !shouldTestStop();
++i) {
try {
// at the end of every operation, throttle per the config.
SCOPE_EXIT { throttleFn(); };
// detect refcount leaks when run in debug mode.
#ifndef NDEBUG
auto checkCnt = [](int cnt) {
if (cnt != 0) {
throw std::runtime_error(folly::sformat("Refcount leak {}", cnt));
}
};
checkCnt(cache_->getHandleCountForThread());
SCOPE_EXIT { checkCnt(cache_->getHandleCountForThread()); };
#endif
++stats.ops;
const auto pid = static_cast<PoolId>(opPoolDist(gen));
const Request& req(getReq(pid, gen, lastRequestId));
OpType op = req.getOp();
const std::string* key = &(req.key);
std::string oneHitKey;
if (op == OpType::kLoneGet || op == OpType::kLoneSet) {
oneHitKey = Request::getUniqueKey();
key = &oneHitKey;
}
OpResultType result(OpResultType::kNop);
switch (op) {
case OpType::kLoneSet:
case OpType::kSet: {
auto lock = chainedItemAcquireUniqueLock(*key);
result = setKey(pid, stats, key, *(req.sizeBegin), req.ttlSecs,
req.admFeatureMap);
break;
}
case OpType::kLoneGet:
case OpType::kGet: {
++stats.get;
auto slock = chainedItemAcquireSharedLock(*key);
auto xlock = decltype(chainedItemAcquireUniqueLock(*key)){};
if (ticker_) {
ticker_->updateTimeStamp(req.timestamp);
}
// TODO currently pure lookaside, we should
// add a distribution over sequences of requests/access patterns
// e.g. get-no-set and set-no-get
cache_->recordAccess(*key);
auto it = cache_->find(*key, AccessMode::kRead);
if (it == nullptr) {
++stats.getMiss;
result = OpResultType::kGetMiss;
if (config_.enableLookaside) {
// allocate and insert on miss
// upgrade access privledges, (lock_upgrade is not
// appropriate here)
slock = {};
xlock = chainedItemAcquireUniqueLock(*key);
setKey(pid, stats, key, *(req.sizeBegin), req.ttlSecs,
req.admFeatureMap);
}
} else {
result = OpResultType::kGetHit;
}
break;
}
case OpType::kDel: {
++stats.del;
auto lock = chainedItemAcquireUniqueLock(*key);
auto res = cache_->remove(*key);
if (res == CacheT::RemoveRes::kNotFoundInRam) {
++stats.delNotFound;
}
break;
}
case OpType::kAddChained: {
++stats.get;
auto lock = chainedItemAcquireUniqueLock(*key);
auto it = cache_->find(*key, AccessMode::kRead);
if (!it) {
++stats.getMiss;
++stats.set;
it = cache_->allocate(pid, *key, *(req.sizeBegin), req.ttlSecs);
if (!it) {
++stats.setFailure;
break;
}
populateItem(it);
cache_->insertOrReplace(it);
}
XDCHECK(req.sizeBegin + 1 != req.sizeEnd);
bool chainSuccessful = false;
for (auto j = req.sizeBegin + 1; j != req.sizeEnd; j++) {
++stats.addChained;
const auto size = *j;
auto child = cache_->allocateChainedItem(it, size);
if (!child) {
++stats.addChainedFailure;
continue;
}
chainSuccessful = true;
populateItem(child);
cache_->addChainedItem(it, std::move(child));
}
if (chainSuccessful && cache_->consistencyCheckEnabled()) {
cache_->trackChainChecksum(it);
}
break;
}
case OpType::kUpdate: {
++stats.get;
++stats.update;
auto lock = chainedItemAcquireUniqueLock(*key);
if (ticker_) {
ticker_->updateTimeStamp(req.timestamp);
}
auto it = cache_->find(*key, AccessMode::kWrite);
if (it == nullptr) {
++stats.getMiss;
++stats.updateMiss;
break;
}
cache_->updateItemRecordVersion(it);
break;
}
default:
throw std::runtime_error(
folly::sformat("invalid operation generated: {}", (int)op));
break;
}
lastRequestId = req.requestId;
if (req.requestId) {
// req might be deleted after calling notifyResult()
wg_->notifyResult(*req.requestId, result);
}
} catch (const cachebench::EndOfTrace& ex) {
break;
}
}
wg_->markFinish();
}