void stressByDiscreteDistribution()

in cachelib/cachebench/runner/CacheStressor.h [243:416]


  void stressByDiscreteDistribution(ThroughputStats& stats) {
    std::mt19937_64 gen(folly::Random::rand64());
    std::discrete_distribution<> opPoolDist(config_.opPoolDistribution.begin(),
                                            config_.opPoolDistribution.end());
    const uint64_t opDelayBatch = config_.opDelayBatch;
    const uint64_t opDelayNs = config_.opDelayNs;
    const std::chrono::nanoseconds opDelay(opDelayNs);

    const bool needDelay = opDelayBatch != 0 && opDelayNs != 0;
    uint64_t opCounter = 0;
    auto throttleFn = [&] {
      if (needDelay && ++opCounter == opDelayBatch) {
        opCounter = 0;
        std::this_thread::sleep_for(opDelay);
      }
      // Limit the rate if specified.
      limitRate();
    };

    std::optional<uint64_t> lastRequestId = std::nullopt;
    for (uint64_t i = 0;
         i < config_.numOps &&
         cache_->getInconsistencyCount() < config_.maxInconsistencyCount &&
         cache_->getInvalidDestructorCount() <
             config_.maxInvalidDestructorCount &&
         !cache_->isNvmCacheDisabled() && !shouldTestStop();
         ++i) {
      try {
        // at the end of every operation, throttle per the config.
        SCOPE_EXIT { throttleFn(); };
          // detect refcount leaks when run in  debug mode.
#ifndef NDEBUG
        auto checkCnt = [](int cnt) {
          if (cnt != 0) {
            throw std::runtime_error(folly::sformat("Refcount leak {}", cnt));
          }
        };
        checkCnt(cache_->getHandleCountForThread());
        SCOPE_EXIT { checkCnt(cache_->getHandleCountForThread()); };
#endif
        ++stats.ops;

        const auto pid = static_cast<PoolId>(opPoolDist(gen));
        const Request& req(getReq(pid, gen, lastRequestId));
        OpType op = req.getOp();
        const std::string* key = &(req.key);
        std::string oneHitKey;
        if (op == OpType::kLoneGet || op == OpType::kLoneSet) {
          oneHitKey = Request::getUniqueKey();
          key = &oneHitKey;
        }

        OpResultType result(OpResultType::kNop);
        switch (op) {
        case OpType::kLoneSet:
        case OpType::kSet: {
          auto lock = chainedItemAcquireUniqueLock(*key);
          result = setKey(pid, stats, key, *(req.sizeBegin), req.ttlSecs,
                          req.admFeatureMap);

          break;
        }
        case OpType::kLoneGet:
        case OpType::kGet: {
          ++stats.get;

          auto slock = chainedItemAcquireSharedLock(*key);
          auto xlock = decltype(chainedItemAcquireUniqueLock(*key)){};

          if (ticker_) {
            ticker_->updateTimeStamp(req.timestamp);
          }
          // TODO currently pure lookaside, we should
          // add a distribution over sequences of requests/access patterns
          // e.g. get-no-set and set-no-get
          cache_->recordAccess(*key);
          auto it = cache_->find(*key, AccessMode::kRead);
          if (it == nullptr) {
            ++stats.getMiss;
            result = OpResultType::kGetMiss;

            if (config_.enableLookaside) {
              // allocate and insert on miss
              // upgrade access privledges, (lock_upgrade is not
              // appropriate here)
              slock = {};
              xlock = chainedItemAcquireUniqueLock(*key);
              setKey(pid, stats, key, *(req.sizeBegin), req.ttlSecs,
                     req.admFeatureMap);
            }
          } else {
            result = OpResultType::kGetHit;
          }

          break;
        }
        case OpType::kDel: {
          ++stats.del;
          auto lock = chainedItemAcquireUniqueLock(*key);
          auto res = cache_->remove(*key);
          if (res == CacheT::RemoveRes::kNotFoundInRam) {
            ++stats.delNotFound;
          }
          break;
        }
        case OpType::kAddChained: {
          ++stats.get;
          auto lock = chainedItemAcquireUniqueLock(*key);
          auto it = cache_->find(*key, AccessMode::kRead);
          if (!it) {
            ++stats.getMiss;

            ++stats.set;
            it = cache_->allocate(pid, *key, *(req.sizeBegin), req.ttlSecs);
            if (!it) {
              ++stats.setFailure;
              break;
            }
            populateItem(it);
            cache_->insertOrReplace(it);
          }
          XDCHECK(req.sizeBegin + 1 != req.sizeEnd);
          bool chainSuccessful = false;
          for (auto j = req.sizeBegin + 1; j != req.sizeEnd; j++) {
            ++stats.addChained;

            const auto size = *j;
            auto child = cache_->allocateChainedItem(it, size);
            if (!child) {
              ++stats.addChainedFailure;
              continue;
            }
            chainSuccessful = true;
            populateItem(child);
            cache_->addChainedItem(it, std::move(child));
          }
          if (chainSuccessful && cache_->consistencyCheckEnabled()) {
            cache_->trackChainChecksum(it);
          }
          break;
        }
        case OpType::kUpdate: {
          ++stats.get;
          ++stats.update;
          auto lock = chainedItemAcquireUniqueLock(*key);
          if (ticker_) {
            ticker_->updateTimeStamp(req.timestamp);
          }
          auto it = cache_->find(*key, AccessMode::kWrite);
          if (it == nullptr) {
            ++stats.getMiss;
            ++stats.updateMiss;
            break;
          }
          cache_->updateItemRecordVersion(it);
          break;
        }
        default:
          throw std::runtime_error(
              folly::sformat("invalid operation generated: {}", (int)op));
          break;
        }

        lastRequestId = req.requestId;
        if (req.requestId) {
          // req might be deleted after calling notifyResult()
          wg_->notifyResult(*req.requestId, result);
        }
      } catch (const cachebench::EndOfTrace& ex) {
        break;
      }
    }
    wg_->markFinish();
  }