cachelib/cachebench/runner/CacheStressor.h (377 lines of code) (raw):

/* * Copyright (c) Facebook, Inc. and its affiliates. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ #pragma once #include <folly/Random.h> #include <folly/TokenBucket.h> #include <atomic> #include <cstddef> #include <iostream> #include <memory> #include <thread> #include <unordered_set> #include "cachelib/cachebench/cache/Cache.h" #include "cachelib/cachebench/cache/TimeStampTicker.h" #include "cachelib/cachebench/runner/Stressor.h" #include "cachelib/cachebench/util/Config.h" #include "cachelib/cachebench/util/Exceptions.h" #include "cachelib/cachebench/util/Parallel.h" #include "cachelib/cachebench/util/Request.h" #include "cachelib/cachebench/workload/GeneratorBase.h" namespace facebook { namespace cachelib { namespace cachebench { // Implementation of stressor that uses a workload generator to stress an // instance of the cache. All item's value in CacheStressor follows CacheValue // schema, which contains a few integers for sanity checks use. So it is invalid // to use item.getMemory and item.getSize APIs. template <typename Allocator> class CacheStressor : public Stressor { public: using CacheT = Cache<Allocator>; using Key = typename CacheT::Key; using ItemHandle = typename CacheT::ItemHandle; // @param cacheConfig the config to instantiate the cache instance // @param config stress test config // @param generator workload generator CacheStressor(CacheConfig cacheConfig, StressorConfig config, std::unique_ptr<GeneratorBase>&& generator) : config_(std::move(config)), throughputStats_(config_.numThreads), wg_(std::move(generator)), hardcodedString_(genHardcodedString()), endTime_{std::chrono::system_clock::time_point::max()} { // if either consistency check is enabled or if we want to move // items during slab release, we want readers and writers of chained // allocs to be synchronized typename CacheT::ChainedItemMovingSync movingSync; if (config_.usesChainedItems() && (cacheConfig.moveOnSlabRelease || config_.checkConsistency)) { lockEnabled_ = true; struct CacheStressSyncObj : public CacheT::SyncObj { std::unique_lock<folly::SharedMutex> lock; CacheStressSyncObj(CacheStressor& s, std::string itemKey) : lock{s.chainedItemAcquireUniqueLock(itemKey)} {} }; movingSync = [this](typename CacheT::Item::Key key) { return std::make_unique<CacheStressSyncObj>(*this, key.str()); }; } if (cacheConfig.useTraceTimeStamp && cacheConfig.tickerSynchingSeconds > 0) { // When using trace based replay for generating the workload, // TimeStampTicker allows syncing the notion of time between the // cache and the workload generator based on timestamps in the trace. ticker_ = std::make_shared<TimeStampTicker>( config.numThreads, cacheConfig.tickerSynchingSeconds, [wg = wg_.get()](double elapsedSecs) { wg->renderWindowStats(elapsedSecs, std::cout); }); cacheConfig.ticker = ticker_; } cache_ = std::make_unique<CacheT>(cacheConfig, movingSync); if (config_.opPoolDistribution.size() > cache_->numPools()) { throw std::invalid_argument(folly::sformat( "more pools specified in the test than in the cache. " "test: {}, cache: {}", config_.opPoolDistribution.size(), cache_->numPools())); } if (config_.keyPoolDistribution.size() != cache_->numPools()) { throw std::invalid_argument(folly::sformat( "different number of pools in the test from in the cache. " "test: {}, cache: {}", config_.keyPoolDistribution.size(), cache_->numPools())); } if (config_.checkConsistency) { cache_->enableConsistencyCheck(wg_->getAllKeys()); } if (config_.opRatePerSec > 0) { rateLimiter_ = std::make_unique<folly::BasicTokenBucket<>>( config_.opRatePerSec, config_.opRatePerSec); } } ~CacheStressor() override { finish(); } // Start the stress test by spawning the worker threads and waiting for them // to finish the stress operations. void start() override { { std::lock_guard<std::mutex> l(timeMutex_); startTime_ = std::chrono::system_clock::now(); } std::cout << folly::sformat("Total {:.2f}M ops to be run", config_.numThreads * config_.numOps / 1e6) << std::endl; stressWorker_ = std::thread([this] { std::vector<std::thread> workers; for (uint64_t i = 0; i < config_.numThreads; ++i) { workers.push_back( std::thread([this, throughputStats = &throughputStats_.at(i)]() { stressByDiscreteDistribution(*throughputStats); })); } for (auto& worker : workers) { worker.join(); } { std::lock_guard<std::mutex> l(timeMutex_); endTime_ = std::chrono::system_clock::now(); } }); } // Block until all stress workers are finished. void finish() override { if (stressWorker_.joinable()) { stressWorker_.join(); } wg_->markShutdown(); cache_->clearCache(config_.maxInvalidDestructorCount); } // abort the stress run by indicating to the workload generator and // delegating to the base class abort() to stop the test. void abort() override { wg_->markShutdown(); Stressor::abort(); } // obtain stats from the cache instance. Stats getCacheStats() const override { return cache_->getStats(); } // obtain aggregated throughput stats for the stress run so far. ThroughputStats aggregateThroughputStats() const override { ThroughputStats res{}; for (const auto& stats : throughputStats_) { res += stats; } return res; } void renderWorkloadGeneratorStats(uint64_t elapsedTimeNs, std::ostream& out) const override { wg_->renderStats(elapsedTimeNs, out); } void renderWorkloadGeneratorStats( uint64_t elapsedTimeNs, folly::UserCounters& counters) const override { wg_->renderStats(elapsedTimeNs, counters); } uint64_t getTestDurationNs() const override { std::lock_guard<std::mutex> l(timeMutex_); return std::chrono::nanoseconds{ std::min(std::chrono::system_clock::now(), endTime_) - startTime_} .count(); } private: static std::string genHardcodedString() { const std::string s = "The quick brown fox jumps over the lazy dog. "; std::string val; for (int i = 0; i < 4 * 1024 * 1024; i += s.size()) { val += s; } return val; } folly::SharedMutex& getLock(Key key) { auto bucket = MurmurHash2{}(key.data(), key.size()) % locks_.size(); return locks_[bucket]; } // TODO maintain state on whether key has chained allocs and use it to only // lock for keys with chained items. auto chainedItemAcquireSharedLock(Key key) { using Lock = std::shared_lock<folly::SharedMutex>; return lockEnabled_ ? Lock{getLock(key)} : Lock{}; } auto chainedItemAcquireUniqueLock(Key key) { using Lock = std::unique_lock<folly::SharedMutex>; return lockEnabled_ ? Lock{getLock(key)} : Lock{}; } // populate the input item handle according to the stress setup. void populateItem(ItemHandle& handle) { if (!config_.populateItem) { return; } XDCHECK(handle); XDCHECK_LE(cache_->getSize(handle), 4ULL * 1024 * 1024); if (cache_->consistencyCheckEnabled()) { cache_->setUint64ToItem(handle, folly::Random::rand64(rng)); } else { cache_->setStringItem(handle, hardcodedString_); } } // Runs a number of operations on the cache allocator. The actual // operations and key/value used are determined by the workload generator // initialized. // // Throughput and Hit/Miss rates are tracked here as well // // @param stats Throughput stats void stressByDiscreteDistribution(ThroughputStats& stats) { std::mt19937_64 gen(folly::Random::rand64()); std::discrete_distribution<> opPoolDist(config_.opPoolDistribution.begin(), config_.opPoolDistribution.end()); const uint64_t opDelayBatch = config_.opDelayBatch; const uint64_t opDelayNs = config_.opDelayNs; const std::chrono::nanoseconds opDelay(opDelayNs); const bool needDelay = opDelayBatch != 0 && opDelayNs != 0; uint64_t opCounter = 0; auto throttleFn = [&] { if (needDelay && ++opCounter == opDelayBatch) { opCounter = 0; std::this_thread::sleep_for(opDelay); } // Limit the rate if specified. limitRate(); }; std::optional<uint64_t> lastRequestId = std::nullopt; for (uint64_t i = 0; i < config_.numOps && cache_->getInconsistencyCount() < config_.maxInconsistencyCount && cache_->getInvalidDestructorCount() < config_.maxInvalidDestructorCount && !cache_->isNvmCacheDisabled() && !shouldTestStop(); ++i) { try { // at the end of every operation, throttle per the config. SCOPE_EXIT { throttleFn(); }; // detect refcount leaks when run in debug mode. #ifndef NDEBUG auto checkCnt = [](int cnt) { if (cnt != 0) { throw std::runtime_error(folly::sformat("Refcount leak {}", cnt)); } }; checkCnt(cache_->getHandleCountForThread()); SCOPE_EXIT { checkCnt(cache_->getHandleCountForThread()); }; #endif ++stats.ops; const auto pid = static_cast<PoolId>(opPoolDist(gen)); const Request& req(getReq(pid, gen, lastRequestId)); OpType op = req.getOp(); const std::string* key = &(req.key); std::string oneHitKey; if (op == OpType::kLoneGet || op == OpType::kLoneSet) { oneHitKey = Request::getUniqueKey(); key = &oneHitKey; } OpResultType result(OpResultType::kNop); switch (op) { case OpType::kLoneSet: case OpType::kSet: { auto lock = chainedItemAcquireUniqueLock(*key); result = setKey(pid, stats, key, *(req.sizeBegin), req.ttlSecs, req.admFeatureMap); break; } case OpType::kLoneGet: case OpType::kGet: { ++stats.get; auto slock = chainedItemAcquireSharedLock(*key); auto xlock = decltype(chainedItemAcquireUniqueLock(*key)){}; if (ticker_) { ticker_->updateTimeStamp(req.timestamp); } // TODO currently pure lookaside, we should // add a distribution over sequences of requests/access patterns // e.g. get-no-set and set-no-get cache_->recordAccess(*key); auto it = cache_->find(*key, AccessMode::kRead); if (it == nullptr) { ++stats.getMiss; result = OpResultType::kGetMiss; if (config_.enableLookaside) { // allocate and insert on miss // upgrade access privledges, (lock_upgrade is not // appropriate here) slock = {}; xlock = chainedItemAcquireUniqueLock(*key); setKey(pid, stats, key, *(req.sizeBegin), req.ttlSecs, req.admFeatureMap); } } else { result = OpResultType::kGetHit; } break; } case OpType::kDel: { ++stats.del; auto lock = chainedItemAcquireUniqueLock(*key); auto res = cache_->remove(*key); if (res == CacheT::RemoveRes::kNotFoundInRam) { ++stats.delNotFound; } break; } case OpType::kAddChained: { ++stats.get; auto lock = chainedItemAcquireUniqueLock(*key); auto it = cache_->find(*key, AccessMode::kRead); if (!it) { ++stats.getMiss; ++stats.set; it = cache_->allocate(pid, *key, *(req.sizeBegin), req.ttlSecs); if (!it) { ++stats.setFailure; break; } populateItem(it); cache_->insertOrReplace(it); } XDCHECK(req.sizeBegin + 1 != req.sizeEnd); bool chainSuccessful = false; for (auto j = req.sizeBegin + 1; j != req.sizeEnd; j++) { ++stats.addChained; const auto size = *j; auto child = cache_->allocateChainedItem(it, size); if (!child) { ++stats.addChainedFailure; continue; } chainSuccessful = true; populateItem(child); cache_->addChainedItem(it, std::move(child)); } if (chainSuccessful && cache_->consistencyCheckEnabled()) { cache_->trackChainChecksum(it); } break; } case OpType::kUpdate: { ++stats.get; ++stats.update; auto lock = chainedItemAcquireUniqueLock(*key); if (ticker_) { ticker_->updateTimeStamp(req.timestamp); } auto it = cache_->find(*key, AccessMode::kWrite); if (it == nullptr) { ++stats.getMiss; ++stats.updateMiss; break; } cache_->updateItemRecordVersion(it); break; } default: throw std::runtime_error( folly::sformat("invalid operation generated: {}", (int)op)); break; } lastRequestId = req.requestId; if (req.requestId) { // req might be deleted after calling notifyResult() wg_->notifyResult(*req.requestId, result); } } catch (const cachebench::EndOfTrace& ex) { break; } } wg_->markFinish(); } // inserts key into the cache if the admission policy also indicates the // key is worthy to be cached. // // @param pid pool id to insert the key // @param stats reference to the stats structure. // @param key the key to be inserted // @param size size of the cache value // @param ttlSecs ttl for the value // @param featureMap feature map for admission policy decisions. OpResultType setKey( PoolId pid, ThroughputStats& stats, const std::string* key, size_t size, uint32_t ttlSecs, const std::unordered_map<std::string, std::string>& featureMap) { // check the admission policy first, and skip the set operation // if the policy returns false if (config_.admPolicy && !config_.admPolicy->accept(featureMap)) { return OpResultType::kSetSkip; } ++stats.set; auto it = cache_->allocate(pid, *key, size, ttlSecs); if (it == nullptr) { ++stats.setFailure; return OpResultType::kSetFailure; } else { populateItem(it); cache_->insertOrReplace(it); return OpResultType::kSetSuccess; } } // fetch a request from the workload generator for a particular pool // @param pid the pool id chosen for the request. // @param gen the thread local random number generator to be // fed // to the workload generator for constructing the // request. // @param lastRequestId optional information about the last request id // that // was given to this thread by the workload // generator. This is used to provide continuity by // some generator implementations. const Request& getReq(const PoolId& pid, std::mt19937_64& gen, std::optional<uint64_t>& lastRequestId) { while (true) { const Request& req(wg_->getReq(pid, gen, lastRequestId)); if (config_.checkConsistency && cache_->isInvalidKey(req.key)) { continue; } return req; } } void limitRate() { if (!rateLimiter_) { return; } rateLimiter_->consumeWithBorrowAndWait(1); } const StressorConfig config_; // config for the stress run std::vector<ThroughputStats> throughputStats_; // thread local stats std::unique_ptr<GeneratorBase> wg_; // workload generator // locks when using chained item and moving. std::array<folly::SharedMutex, 1024> locks_; // if locking is enabled. std::atomic<bool> lockEnabled_{false}; // memorize rng to improve random performance folly::ThreadLocalPRNG rng; // string used for generating random payloads const std::string hardcodedString_; std::unique_ptr<CacheT> cache_; // Ticker that syncs the time according to trace timestamp. std::shared_ptr<TimeStampTicker> ticker_; // main stressor thread std::thread stressWorker_; // mutex to protect reading the timestamps. mutable std::mutex timeMutex_; // start time for the stress test std::chrono::time_point<std::chrono::system_clock> startTime_; // time when benchmark finished. This is set once the benchmark finishes std::chrono::time_point<std::chrono::system_clock> endTime_; // Token bucket used to limit the operations per second. std::unique_ptr<folly::BasicTokenBucket<>> rateLimiter_; }; } // namespace cachebench } // namespace cachelib } // namespace facebook