cachelib/cachebench/runner/IntegrationStressor.h (127 lines of code) (raw):
/*
* Copyright (c) Facebook, Inc. and its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#pragma once
#include "cachelib/cachebench/runner/Stressor.h"
#include "cachelib/datatype/Map.h"
#include "cachelib/datatype/RangeMap.h"
namespace facebook {
namespace cachelib {
namespace cachebench {
namespace detail {
struct TestValue {
static std::unique_ptr<TestValue> create(uint32_t val, uint32_t size) {
return std::unique_ptr<TestValue>{new (size) TestValue(val, size)};
}
static void* operator new(size_t /* unused */, size_t actualSize) {
void* ptr = malloc(actualSize);
XDCHECK(ptr != nullptr);
return ptr;
}
static void operator delete(void* ptr) { free(ptr); }
TestValue(uint32_t val, uint32_t size) : size_{size}, val_{val} {}
uint32_t getStorageSize() const { return size_; }
// Actual size of the test value.
uint32_t size_;
// Value we will be using to ensure correctness in the test.
uint32_t val_;
};
} // namespace detail
// This test case is designed to accumulate a huge amount of outstanding
// refcounts on a single item in order to test if cache can run fine when
// we run into refcount overflow problems.
class HighRefcountStressor : public Stressor {
public:
// @param cacheConfig configuration for the cache.
// @param numOps number of ops per thread.
HighRefcountStressor(const CacheConfig& cacheConfig, uint64_t numOps);
Stats getCacheStats() const override { return cache_->getStats(); }
// Only number of operations are reported
ThroughputStats aggregateThroughputStats() const override {
ThroughputStats stats;
stats.ops = ops_;
return stats;
}
uint64_t getTestDurationNs() const override {
return std::chrono::nanoseconds{endTime_ - startTime_}.count();
}
void start() override;
void finish() override { testThread_.join(); }
private:
void testLoop();
static const uint32_t kNumThreads = 10'000;
const uint64_t numOpsPerThread_{};
std::chrono::time_point<std::chrono::system_clock> startTime_;
std::chrono::time_point<std::chrono::system_clock> endTime_;
std::atomic<uint64_t> ops_{0};
std::unique_ptr<Cache<LruAllocator>> cache_;
std::thread testThread_;
};
// This test case populates a cachelib::Map and tries to expand it to a very
// large size and then remove some entries at random and try to compact. It
// will read the entries of the map by iterating through them and try to
// verify they do exist by looking them up via the map's hash table.
class CachelibMapStressor : public Stressor {
public:
CachelibMapStressor(const CacheConfig& cacheConfig, uint64_t numOps);
Stats getCacheStats() const override { return cache_->getStats(); }
// Only number of operations are reported
ThroughputStats aggregateThroughputStats() const override {
ThroughputStats stats;
stats.ops = ops_;
return stats;
}
uint64_t getTestDurationNs() const override {
return std::chrono::nanoseconds{endTime_ - startTime_}.count();
}
void start() override;
void finish() override { testThread_.join(); }
private:
using CacheType = Cache<LruAllocator>;
using TestMap = cachelib::Map<uint32_t, detail::TestValue, CacheType>;
// Add entries to the map
static void populate(TestMap& map);
// Randomly delete some elements
static void pokeHoles(TestMap& map);
// Read entries from the map by iterating and verify they can be looked up
static void readEntries(TestMap& map);
void testLoop();
void lockExclusive(CacheType::Key key) { getLock(key).lock(); }
void unlockExclusive(CacheType::Key key) { getLock(key).unlock(); }
folly::SharedMutex& getLock(CacheType::Key key) {
auto bucket = MurmurHash2{}(key.data(), key.size()) % locks_.size();
return locks_[bucket];
}
// Test configurations.
static const uint32_t kMapSizeUpperbound = 10'000;
static const uint32_t kMapDeletionRate = 5;
static const uint32_t kMapShrinkSize = 100;
static const uint32_t kMapEntryValueSizeMin = 1000;
static const uint32_t kMapEntryValueSizeMax = 2000;
static const uint32_t kMapInsertionBatchMin = 1000;
static const uint32_t kMapInsertionBatchMax = 2000;
const uint64_t numOpsPerThread_{};
std::chrono::time_point<std::chrono::system_clock> startTime_;
std::chrono::time_point<std::chrono::system_clock> endTime_;
std::atomic<uint64_t> ops_{0};
std::array<std::string, 100> keys_;
std::unique_ptr<CacheType> cache_;
std::array<folly::SharedMutex, 100> locks_;
std::thread testThread_;
};
// This test case populates a cachelib::Map and tries to expand it to a very
// large size and then remove some entries at random and try to compact. It
// will read the entries of the map by iterating through them and try to
// verify they do exist by looking them up via the map's index.
class CachelibRangeMapStressor : public Stressor {
public:
CachelibRangeMapStressor(const CacheConfig& cacheConfig, uint64_t numOps);
Stats getCacheStats() const override { return cache_->getStats(); }
// Only number of operations are reported
ThroughputStats aggregateThroughputStats() const override {
ThroughputStats stats;
stats.ops = ops_;
return stats;
}
uint64_t getTestDurationNs() const override {
return std::chrono::nanoseconds{endTime_ - startTime_}.count();
}
void start() override;
void finish() override { testThread_.join(); }
private:
using CacheType = Cache<LruAllocator>;
using TestMap = cachelib::RangeMap<uint32_t, detail::TestValue, CacheType>;
// Add entries to the map
static void populate(TestMap& map);
// Randomly delete some elements
static void pokeHoles(TestMap& map);
// Read entries from the map by iterating and verify they can be looked up
static void readEntries(TestMap& map);
void testLoop();
void lockExclusive(CacheType::Key key) { getLock(key).lock(); }
void unlockExclusive(CacheType::Key key) { getLock(key).unlock(); }
folly::SharedMutex& getLock(CacheType::Key key) {
auto bucket = MurmurHash2{}(key.data(), key.size()) % locks_.size();
return locks_[bucket];
}
// Test configurations.
static const uint32_t kMapSizeUpperbound = 10'000;
static const uint32_t kMapDeletionRate = 5;
static const uint32_t kMapShrinkSize = 100;
static const uint32_t kMapEntryValueSizeMin = 1000;
static const uint32_t kMapEntryValueSizeMax = 2000;
static const uint32_t kMapInsertionBatchMin = 1000;
static const uint32_t kMapInsertionBatchMax = 2000;
const uint64_t numOpsPerThread_{};
std::chrono::time_point<std::chrono::system_clock> startTime_;
std::chrono::time_point<std::chrono::system_clock> endTime_;
std::atomic<uint64_t> ops_{0};
std::array<std::string, 100> keys_;
std::unique_ptr<CacheType> cache_;
std::array<folly::SharedMutex, 100> locks_;
std::thread testThread_;
};
} // namespace cachebench
} // namespace cachelib
} // namespace facebook