include/core/CCompressedLfuCache.h (506 lines of code) (raw):
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the following additional limitation. Functionality enabled by the
* files subject to the Elastic License 2.0 may only be used in production when
* invoked by an Elasticsearch process with a license key installed that permits
* use of machine learning features. You may not use this file except in
* compliance with the Elastic License 2.0 and the foregoing additional
* limitation.
*/
#ifndef INCLUDED_ml_core_CCompressedLfuCache_h
#define INCLUDED_ml_core_CCompressedLfuCache_h
#include <core/CCompressedDictionary.h>
#include <core/CLogger.h>
#include <core/CMemoryDefStd.h>
#include <core/CPersistUtils.h>
#include <core/CStatePersistInserter.h>
#include <core/CStateRestoreTraverser.h>
#include <core/RestoreMacros.h>
#include <boost/unordered_map.hpp>
#include <algorithm>
#include <chrono>
#include <cstddef>
#include <cstdint>
#include <functional>
#include <limits>
#include <memory>
#include <mutex>
#include <optional>
#include <set>
#include <shared_mutex>
#include <string>
#include <thread>
namespace ml {
namespace core {
namespace compressed_lfu_cache_detail {
//! \brief Implements a memory limited least frequently used cache.
//!
//! IMPLEMENTATION DECISIONS:\n
//! To make the implementation less complex we assume cached values are small
//! compared to the memory budget.
//
//! In order to estimate item counts we use the Space-Saving algorithm described
//! in "Efficient Computation of Frequent and Top-k Elements in Data Streams",
//! Metwally et al.
//!
//! The thread safety is delegated to the derived types which must implement the
//! read and write guards. We do not assume that any locks we take are recursive
//! so have the policy that:
//! -# All public member functions implement taking read and write locks
//! unless commented otherwise,
//! -# No internal member functions take any locks,
//! -# No member functions calls a public member function.
//!
//! \tparam KEY The key type which must support conversion to CCompressedDictionary::CWord.
//! \tparam VALUE The value type which should implement memory estimation support.
template<typename KEY, typename VALUE, std::size_t COMPRESSED_KEY_BITS>
class CCompressedLfuCache {
public:
using TDictionary = CCompressedDictionary<COMPRESSED_KEY_BITS / 64>;
using TCompressedKey = typename TDictionary::CWord;
using TCompressKey = std::function<TCompressedKey(const TDictionary&, const KEY&)>;
using TComputeValueCallback = std::function<std::optional<VALUE>(KEY)>;
using TReadValueCallback = std::function<void(const VALUE&, bool)>;
public:
// Construction is only exposed to derived types.
virtual ~CCompressedLfuCache() = default;
CCompressedLfuCache(const CCompressedLfuCache&) = delete;
CCompressedLfuCache& operator=(const CCompressedLfuCache&) = delete;
//! Resize the cache so that it uses no more than \p maximumMemory.
//!
//! \note This simply updates the limit if \p maximumMemory is larger than the
//! current memory usage. Otherwise, it discards items.
void resize(std::size_t maximumMemory) {
this->guardWrite(DONT_TIME_OUT, [&] {
m_MaximumMemory = maximumMemory;
this->computeMemoryParameters();
// Ensure the cache is consuming no more than the current memory limit.
while (this->memoryUsage() > m_MaximumMemory) {
auto itemToEvict = m_ItemStats.begin();
m_RemovedCount += itemToEvict->count();
this->removeFromCache(itemToEvict);
this->maybeReallocateCache();
}
});
}
//! Lookup an item with \p key in the cache or else fall back to computing.
//!
//! \p readValue will always be called even if writing to the cache fails
//! with the exceptions listed below.
//! \warning If \p computeValue fails to produce a value (returns std::nullopt)
//! or a null value is read from the cache then \p readValue will not be called.
//| Null values produced by \p computeValue are not written to the cache.
//!
//! \param[in] key The item key.
//! \param[in] computeValue Computes the value in the case of a cache miss.
//! \param[in] readValue Processes the value.
//! \return True if there was a cache hit.
//! \note For thread safe implementations we support timing out waiting for
//! a lock since we can make progress even we skip reading or writing from
//! the cache. This allows the caller to control the maximum latency on top
//! of \p computeValue.
//! \note Reading the result takes place under a lock so cut the work done
//! in \p readValue to the minimum possible.
//! \warning The value reference is only guaranteed to survive for the duration
//! of \p readValue.
//! \warning \p key is passed by value so don't forget to move it into this
//! function if possible.
//! \warning \p computeValue and \p readValue must not call functions on the
//! cache.
bool lookup(KEY key,
const TComputeValueCallback& computeValue,
const TReadValueCallback& readValue) {
auto compressedKey = m_CompressKey(m_Dictionary, key);
if (this->guardRead(TIME_OUT, [&] {
++m_NumberLookups;
auto hit = this->hit(compressedKey);
if (hit != nullptr) {
++m_NumberHits;
readValue(*hit, true);
return true;
}
return false;
})) {
if (this->guardWrite(TIME_OUT, [&] {
this->incrementCount(compressedKey);
}) == false) {
++m_LostCount;
}
return true;
}
auto value = computeValue(std::move(key));
if (!value) {
return false;
}
std::size_t itemMemoryUsage{memory::dynamicSize(*value)};
if (this->guardWrite(TIME_OUT, [&] {
// It is possible that two values with the same key check the cache
// before either takes the write lock. So check if this is already
// in the cache before going any further.
if (m_ItemCache.find(compressedKey) != m_ItemCache.end()) {
readValue(*value, true);
this->incrementCount(compressedKey);
return;
}
// Update the item counts using the Space-Saving algorithm.
std::size_t count{1};
std::size_t lastEvictedCount{0};
while (this->full(itemMemoryUsage)) {
auto itemToEvict = m_ItemStats.begin();
// It's possible that the cache is empty yet isn't big
// enough to hold this new item.
if (itemToEvict == m_ItemStats.end()) {
readValue(*value, false);
return;
}
m_RemovedCount += lastEvictedCount;
lastEvictedCount = itemToEvict->count();
this->removeFromCache(itemToEvict);
}
readValue(this->insert(compressedKey, *value, itemMemoryUsage,
count + lastEvictedCount),
false);
}) == false) {
++m_LostCount;
// guardWrite failed to acquire the lock but the potentially
// expensive work of the compute callback is complete.
// Return the computed value even if it has not been written
// to the cache.
readValue(*value, false);
}
return false;
}
void clear() {
this->guardWrite(DONT_TIME_OUT, [&] {
m_NumberLookups = 0;
m_NumberHits = 0;
m_LostCount = 0;
m_RemovedCount = 0;
m_ItemsMemoryUsage = 0;
m_ItemCache.clear();
m_ItemStats.clear();
});
}
//! Get the proportion of requests which result in a hit.
double hitFraction() const {
return std::min(static_cast<double>(m_NumberHits.load()) /
static_cast<double>(m_NumberLookups.load()),
1.0);
}
//! Get the number of items currently stored in the cache.
std::size_t size() const {
std::size_t result{std::numeric_limits<std::size_t>::max()};
this->guardRead(DONT_TIME_OUT, [&] {
result = m_ItemCache.size();
return true;
});
return result;
}
//! Get the stats for item with \p key.
//!
//! \return (memory usage, hit count).
//! \note Returns (0, 0) if the item is not in the cache.
std::pair<std::size_t, std::uint64_t> stats(const KEY& key) const {
std::pair<std::size_t, std::uint64_t> result{0, 0};
auto compressedKey = m_CompressKey(m_Dictionary, key);
this->guardRead(DONT_TIME_OUT, [&] {
auto item = m_ItemCache.find(compressedKey);
if (item != m_ItemCache.end()) {
result = std::make_pair(item->second.stats()->memoryUsage(),
item->second.stats()->count());
}
return true;
});
return result;
}
//! Get the amount of memory the cache is consuming.
std::size_t memoryUsage() const {
std::size_t result{std::numeric_limits<std::size_t>::max()};
this->guardRead(DONT_TIME_OUT, [&] {
result = this->unguardedMemoryUsage();
return true;
});
return result;
}
//! Check cache invariants.
//!
//! \return True if all cache invariants hold.
bool checkInvariants() const {
bool result{true};
this->guardRead(DONT_TIME_OUT, [&] {
if (m_ItemCache.size() != m_ItemStats.size()) {
LOG_ERROR(<< "Size mismatch " << m_ItemCache.size() << " vs "
<< m_ItemStats.size() << ".");
result = false;
}
std::size_t itemsMemoryUsage{0};
std::uint64_t totalCount{m_RemovedCount};
for (auto stats = m_ItemStats.begin(); stats != m_ItemStats.end(); ++stats) {
auto item = m_ItemCache.find(stats->key());
if (item == m_ItemCache.end()) {
LOG_ERROR(<< "Missing item '" << stats->key().print() << "'.");
result = false;
} else if (stats != item->second.stats()) {
LOG_ERROR(<< "Link mismatch for '" << stats->key().print() << "'.");
result = false;
}
itemsMemoryUsage += stats->memoryUsage();
totalCount += stats->count();
}
if (itemsMemoryUsage != m_ItemsMemoryUsage) {
LOG_ERROR(<< "Memory mismatch " << itemsMemoryUsage << " vs "
<< m_ItemsMemoryUsage << ".");
result = false;
}
// We may be missing counts if a single addition caused multiple
// evictions, or if updates can time out
if (m_NumberLookups - m_LostCount != totalCount) {
LOG_ERROR(<< "Count mismatch " << m_NumberLookups.load() << " (less "
<< m_LostCount.load() << " lost) vs " << totalCount << ".");
result = false;
}
return true;
});
return result;
}
//! Persist by passing information to \p inserter.
void acceptPersistInserter(CStatePersistInserter& inserter) const {
this->guardRead(DONT_TIME_OUT, [&] {
CPersistUtils::persist(BUCKET_COUNT_SEQUENCE_TAG, m_BucketCountSequence, inserter);
CPersistUtils::persist(ITEM_CACHE_TAG, m_ItemCache, inserter);
CPersistUtils::persist(ITEMS_MEMORY_USAGE_TAG, m_ItemsMemoryUsage, inserter);
CPersistUtils::persist(ITEM_STATS_TAG, m_ItemStats, inserter);
CPersistUtils::persist(NUMBER_HITS_TAG, m_NumberHits, inserter);
CPersistUtils::persist(NUMBER_LOOKUPS_TAG, m_NumberLookups, inserter);
return true;
});
}
//! Populate the object from serialized data.
//!
//! \warning This is not thread safe since we expect to restore before the
//! cache is being used.
bool acceptRestoreTraverser(CStateRestoreTraverser& traverser) {
do {
const std::string& name{traverser.name()};
RESTORE_WITH_UTILS(BUCKET_COUNT_SEQUENCE_TAG, m_BucketCountSequence)
RESTORE_WITH_UTILS(ITEM_CACHE_TAG, m_ItemCache)
RESTORE_WITH_UTILS(ITEMS_MEMORY_USAGE_TAG, m_ItemsMemoryUsage)
RESTORE_WITH_UTILS(ITEM_STATS_TAG, m_ItemStats)
RESTORE_WITH_UTILS(NUMBER_HITS_TAG, m_NumberHits)
RESTORE_WITH_UTILS(NUMBER_LOOKUPS_TAG, m_NumberLookups)
} while (traverser.next());
for (auto stats = m_ItemStats.begin(); stats != m_ItemStats.end(); ++stats) {
m_ItemCache[stats->key()].stats(stats);
}
this->checkRestoredInvariants();
return true;
}
protected:
using TReadFunc = std::function<bool()>;
using TWriteFunc = std::function<void()>;
protected:
CCompressedLfuCache(std::size_t maximumMemory, TCompressKey compressKey)
: m_MaximumMemory{maximumMemory}, m_CompressKey{std::move(compressKey)} {
this->computeMemoryParameters();
}
private:
//! \brief A cache item hit frequency and memory usage.
class CCacheItemStats {
public:
static constexpr bool dynamicSizeAlwaysZero() { return true; }
public:
CCacheItemStats() = default;
CCacheItemStats(const TCompressedKey& key, std::size_t memoryUsage, std::size_t count)
: m_Key{key}, m_MemoryUsage{memoryUsage}, m_Count{count} {}
bool operator<(const CCacheItemStats& rhs) const {
// Break ties by _increasing_ memory usage so we evict largest items first.
return m_Count < rhs.count() ||
(m_Count == rhs.count() && m_MemoryUsage > rhs.memoryUsage());
}
TCompressedKey key() const { return m_Key; }
std::size_t memoryUsage() const { return m_MemoryUsage; }
std::size_t count() const { return m_Count; }
void count(std::size_t count) { m_Count = count; }
void acceptPersistInserter(CStatePersistInserter& inserter) const {
CPersistUtils::persist(COUNT_TAG, m_Count, inserter);
CPersistUtils::persist(KEY_TAG, m_Key, inserter);
CPersistUtils::persist(MEMORY_USAGE_TAG, m_MemoryUsage, inserter);
}
bool acceptRestoreTraverser(CStateRestoreTraverser& traverser) {
do {
const std::string& name{traverser.name()};
RESTORE_WITH_UTILS(COUNT_TAG, m_Count)
RESTORE_WITH_UTILS(KEY_TAG, m_Key)
RESTORE_WITH_UTILS(MEMORY_USAGE_TAG, m_MemoryUsage)
} while (traverser.next());
return true;
}
private:
static const std::string COUNT_TAG;
static const std::string KEY_TAG;
static const std::string MEMORY_USAGE_TAG;
private:
TCompressedKey m_Key;
std::size_t m_MemoryUsage{0};
std::uint64_t m_Count{0};
};
using TCacheItemStatsSet = std::multiset<CCacheItemStats>;
using TCacheItemStatsSetItr = typename TCacheItemStatsSet::iterator;
//! \brief A cache item wrapper which holds a reference to its stats.
class CCacheItem {
public:
//! We purposely disable direct memory estimation for cache items because
//! we want to avoid quadratic complexity using core::memory::dynamicSize.
//! Instead we estimate memory usage for each item we add and remove from
//! the cache and maintain a running total.
static constexpr bool dynamicSizeAlwaysZero() { return true; }
public:
CCacheItem() = default;
explicit CCacheItem(VALUE value) : m_Value{std::move(value)} {}
const VALUE& value() const { return m_Value; }
TCacheItemStatsSetItr stats() const { return m_Stats; }
void stats(TCacheItemStatsSetItr stats) { m_Stats = stats; }
void acceptPersistInserter(CStatePersistInserter& inserter) const {
CPersistUtils::persist(VALUE_TAG, m_Value, inserter);
}
bool acceptRestoreTraverser(CStateRestoreTraverser& traverser) {
do {
const std::string& name{traverser.name()};
RESTORE_WITH_UTILS(VALUE_TAG, m_Value)
} while (traverser.next());
return true;
}
private:
static const std::string VALUE_TAG;
private:
VALUE m_Value;
TCacheItemStatsSetItr m_Stats;
};
using TSizeVec = std::vector<std::size_t>;
using TCompressedKeyCacheItemUMap = typename TDictionary::template TWordTUMap<CCacheItem>;
using TCompressedKeyCacheItemUMapCItr = typename TCompressedKeyCacheItemUMap::const_iterator;
static constexpr bool TIME_OUT{true};
static constexpr bool DONT_TIME_OUT{false};
private:
virtual bool guardRead(bool timeOut, TReadFunc func) const = 0;
virtual bool guardWrite(bool timeOut, TWriteFunc func) = 0;
virtual bool updatesCanTimeOut() const = 0;
void maybeReallocateCache() {
auto shrunk = std::lower_bound(m_BucketCountSequence.begin(),
m_BucketCountSequence.end(),
m_ItemCache.bucket_count());
if (shrunk != m_BucketCountSequence.begin()) {
--shrunk;
double shrunkLoadFactor{static_cast<double>(m_ItemCache.bucket_count()) /
static_cast<double>(*shrunk) *
static_cast<double>(m_ItemCache.load_factor())};
if (shrunkLoadFactor < static_cast<double>(m_ItemCache.max_load_factor())) {
// The only way we can actually reclaim memory is to copy
// the contents of the cache into a new map.
TCompressedKeyCacheItemUMap copy{*shrunk - 1};
for (auto & [ compressedKey, value ] : m_ItemCache) {
copy.emplace(compressedKey, std::move(value));
}
m_ItemCache.swap(copy);
}
}
}
const VALUE* hit(const TCompressedKey& key) const {
auto item = m_ItemCache.find(key);
return item != m_ItemCache.end() ? &item->second.value() : nullptr;
}
void incrementCount(const TCompressedKey& key) {
auto item = m_ItemCache.find(key);
if (item != m_ItemCache.end()) {
auto node = m_ItemStats.extract(item->second.stats());
node.value().count(node.value().count() + 1);
item->second.stats(m_ItemStats.insert(std::move(node)));
}
}
void removeFromCache(TCacheItemStatsSetItr itemToEvict) {
m_ItemCache.erase(itemToEvict->key());
m_ItemsMemoryUsage -= itemToEvict->memoryUsage();
m_ItemStats.erase(itemToEvict);
}
const VALUE& insert(const TCompressedKey& key,
VALUE value,
std::size_t itemMemoryUsage,
std::size_t count) {
m_ItemsMemoryUsage += itemMemoryUsage;
auto item = m_ItemCache.emplace(key, std::move(value)).first;
auto stats = m_ItemStats.emplace_hint(m_ItemStats.begin(), key,
itemMemoryUsage, count);
item->second.stats(stats);
return item->second.value();
}
bool full(std::size_t itemMemoryUsage) const {
std::size_t memory{this->unguardedMemoryUsage() + itemMemoryUsage +
sizeof(typename TCompressedKeyCacheItemUMap::value_type) +
memory::storageNodeOverhead(m_ItemCache) +
sizeof(typename TCacheItemStatsSet::value_type) +
memory::storageNodeOverhead(m_ItemStats)};
if (m_ItemCache.bucket_count() > 0 && this->needToResizeItemCache()) {
memory += static_cast<std::size_t>(
(static_cast<double>(this->nextItemCacheBucketCount()) /
static_cast<double>(m_ItemCache.bucket_count()) -
1.0) *
static_cast<double>(memory::dynamicSize(m_ItemCache) -
m_ItemCache.size() *
sizeof(typename TCompressedKeyCacheItemUMap::value_type)));
}
return memory > m_MaximumMemory;
}
std::size_t unguardedMemoryUsage() const {
return m_ItemsMemoryUsage + // overheads
memory::dynamicSize(m_ItemCache) + memory::dynamicSize(m_ItemStats) +
memory::dynamicSize(m_BucketCountSequence);
}
bool needToResizeItemCache() const {
return static_cast<double>(m_ItemCache.load_factor()) +
1.0 / static_cast<double>(m_ItemCache.bucket_count()) >
static_cast<double>(m_ItemCache.max_load_factor());
}
std::size_t nextItemCacheBucketCount() const {
return *std::upper_bound(m_BucketCountSequence.begin(),
m_BucketCountSequence.end(), m_ItemCache.bucket_count());
}
void computeMemoryParameters() {
// We expect resizing to cycle through a fixed sequence of bucket counts.
// This deduces and caches that sequence.
typename TDictionary::template TWordTUMap<std::size_t> test;
m_BucketCountSequence.clear();
m_BucketCountSequence.push_back(test.bucket_count());
// Ignore heap memory allocated by VALUE since we only need a lower bound.
std::size_t memoryPerItem{2 * sizeof(TCompressedKey) +
sizeof(std::uint64_t) + sizeof(VALUE)};
for (std::size_t i = 0; memoryPerItem * i < m_MaximumMemory; ++i) {
auto translator = m_Dictionary.translator();
translator.add(i);
test.emplace(translator.word(), i);
if (test.bucket_count() != m_BucketCountSequence.back()) {
m_BucketCountSequence.push_back(test.bucket_count());
}
}
LOG_TRACE(<< "bucket sequence = " << m_BucketCountSequence);
}
void checkRestoredInvariants() const {
VIOLATES_INVARIANT(m_ItemCache.size(), !=, m_ItemStats.size());
for (const auto & [ key, item ] : m_ItemCache) {
VIOLATES_INVARIANT_NO_EVALUATION(item.stats(), ==, m_ItemStats.end());
VIOLATES_INVARIANT_NO_EVALUATION(item.stats(), ==, TCacheItemStatsSetItr{});
VIOLATES_INVARIANT_NO_EVALUATION(key, !=, item.stats()->key());
}
for (const auto& stats : m_ItemStats) {
VIOLATES_INVARIANT_NO_EVALUATION(m_ItemCache.find(stats.key()), ==,
m_ItemCache.end());
}
}
private:
static const std::string BUCKET_COUNT_SEQUENCE_TAG;
static const std::string ITEM_CACHE_TAG;
static const std::string ITEMS_MEMORY_USAGE_TAG;
static const std::string ITEM_STATS_TAG;
static const std::string NUMBER_HITS_TAG;
static const std::string NUMBER_LOOKUPS_TAG;
private:
// Align the lower bits to prevent false sharing with other members. This
// should switch to std::hardware_destructive_interference_size when it's
// fully supported.
alignas(64) std::atomic<std::uint64_t> m_NumberLookups{0};
alignas(64) std::atomic<std::uint64_t> m_NumberHits{0};
//! We can lose count if we fail to obtain a write lock within the timeout
//! period when trying to increment cache item counts.
alignas(64) std::atomic<std::uint64_t> m_LostCount{0};
std::uint64_t m_RemovedCount{0};
std::size_t m_MaximumMemory{0};
std::size_t m_ItemsMemoryUsage{0};
TCompressKey m_CompressKey;
TDictionary m_Dictionary;
TCompressedKeyCacheItemUMap m_ItemCache;
TCacheItemStatsSet m_ItemStats;
TSizeVec m_BucketCountSequence;
};
// clang-format off
template<typename KEY, typename VALUE, std::size_t COMPRESSED_KEY_BITS>
const std::string CCompressedLfuCache<KEY, VALUE, COMPRESSED_KEY_BITS>::BUCKET_COUNT_SEQUENCE_TAG{"bucket_count_sequence"};
template<typename KEY, typename VALUE, std::size_t COMPRESSED_KEY_BITS>
const std::string CCompressedLfuCache<KEY, VALUE, COMPRESSED_KEY_BITS>::ITEM_CACHE_TAG{"item_cache"};
template<typename KEY, typename VALUE, std::size_t COMPRESSED_KEY_BITS>
const std::string CCompressedLfuCache<KEY, VALUE, COMPRESSED_KEY_BITS>::ITEMS_MEMORY_USAGE_TAG{"items_memory_usage"};
template<typename KEY, typename VALUE, std::size_t COMPRESSED_KEY_BITS>
const std::string CCompressedLfuCache<KEY, VALUE, COMPRESSED_KEY_BITS>::ITEM_STATS_TAG{"item_stats"};
template<typename KEY, typename VALUE, std::size_t COMPRESSED_KEY_BITS>
const std::string CCompressedLfuCache<KEY, VALUE, COMPRESSED_KEY_BITS>::NUMBER_HITS_TAG{"number_hits"};
template<typename KEY, typename VALUE, std::size_t COMPRESSED_KEY_BITS>
const std::string CCompressedLfuCache<KEY, VALUE, COMPRESSED_KEY_BITS>::NUMBER_LOOKUPS_TAG{"number_lookups"};
template<typename KEY, typename VALUE, std::size_t COMPRESSED_KEY_BITS>
const std::string CCompressedLfuCache<KEY, VALUE, COMPRESSED_KEY_BITS>::CCacheItem::VALUE_TAG{"value"};
template<typename KEY, typename VALUE, std::size_t COMPRESSED_KEY_BITS>
const std::string CCompressedLfuCache<KEY, VALUE, COMPRESSED_KEY_BITS>::CCacheItemStats::COUNT_TAG{"count"};
template<typename KEY, typename VALUE, std::size_t COMPRESSED_KEY_BITS>
const std::string CCompressedLfuCache<KEY, VALUE, COMPRESSED_KEY_BITS>::CCacheItemStats::KEY_TAG{"key"};
template<typename KEY, typename VALUE, std::size_t COMPRESSED_KEY_BITS>
const std::string CCompressedLfuCache<KEY, VALUE, COMPRESSED_KEY_BITS>::CCacheItemStats::MEMORY_USAGE_TAG{"memory_usage"};
// clang-format on
}
//! \brief A thread safe memory limited least frequently used cache.
//!
//! DESCRIPTION:\n
//! This provides a memory limited least frequently used cache. The maximum memory
//! the cache will consume is supplied as a parameter. It is optimized for the case
//! that the cache keys are large. We use a specified size hash (in number of bits)
//! for the keys. Using sufficient bits means the chance of a collision is effectively
//! zero for reasonably sized caches (for more details see CCompressedDictionary).
//! For this reason, no safe guards are provided in the case of a hash collision.
//!
//! Example usage caching the result of expensive operation and writing to stdout:
//! \code{.cpp}
//! double myExpensiveOperation(std::vector<double> input);
//! ...
//! core::CConcurrentCompressedLfuCache<std::vector<double>, double> cache{
//! 64 * 1024,
//! [](const auto& dictionary, const auto& key) {
//! auto translator = dictionary.translator();
//! translator.add(key);
//! return translator.word();
//! }};
//! for (auto input : inputs) {
//! core::async(
//! core::defaultAsyncExecutor(),
//! [&] {
//! cache.lookup(
//! std::move(input),
//! [](auto input_) { return myExpensiveOperation(std::move(input_)); },
//! [](const auto& result) { std::cout << result << std::endl; });
//! });
//! }
//! \endcode
//!
//! IMPLEMENTATION DECISIONS:\n
//! \see compressed_lfu_cache_detail::CCompressedLfuCache.
template<typename KEY, typename VALUE, std::size_t COMPRESSED_KEY_BITS = 128>
class CConcurrentCompressedLfuCache final
: public compressed_lfu_cache_detail::CCompressedLfuCache<KEY, VALUE, COMPRESSED_KEY_BITS> {
public:
using TBase = compressed_lfu_cache_detail::CCompressedLfuCache<KEY, VALUE, COMPRESSED_KEY_BITS>;
using TDictionary = typename TBase::TDictionary;
using TCompressKey = typename TBase::TCompressKey;
public:
//! \param[in] maximumMemory The maximum memory the cache will consume in bytes.
//! \param[in] maximumWait The maximum time to wait at a critical section.
//! \param[in] compressKey Compresses the key. \see CCompressedDictionary for
//! details.
CConcurrentCompressedLfuCache(std::size_t maximumMemory,
std::chrono::milliseconds maximumWait,
TCompressKey compressKey)
: TBase{maximumMemory, compressKey}, m_MaximumWait{maximumWait} {}
private:
using TReadFunc = typename TBase::TReadFunc;
using TWriteFunc = typename TBase::TWriteFunc;
private:
bool guardRead(bool timeOut, TReadFunc func) const override {
if (timeOut) {
std::shared_lock<std::shared_timed_mutex> lock{m_Mutex, m_MaximumWait};
return lock.owns_lock() && func();
}
std::shared_lock<std::shared_timed_mutex> lock{m_Mutex};
return func();
}
bool guardWrite(bool timeOut, TWriteFunc func) override {
if (timeOut) {
std::unique_lock<std::shared_timed_mutex> lock{m_Mutex, m_MaximumWait};
if (lock.owns_lock()) {
func();
return true;
}
return false;
}
std::unique_lock<std::shared_timed_mutex> lock{m_Mutex};
func();
return true;
}
bool updatesCanTimeOut() const override { return true; }
private:
std::chrono::milliseconds m_MaximumWait;
mutable std::shared_timed_mutex m_Mutex;
};
//! \brief A memory limited least frequently used cache.
//!
//! DESCRIPTION:\n
//! This version is provided to remove all overheads from locking and unlocking
//! mutexes if it is known that the cache is only called from a single thread.
//! Otherwise, its behaviour is identical to CConcurrentCompressedLfuCache.
//!
//! Example usage caching the result of expensive operation and writing to stdout:
//! \code{.cpp}
//! double myExpensiveOperation(std::vector<double> input);
//! ...
//! CCompressedLfuCache<std::vector<double>, double> cache{
//! 64 * 1024,
//! [](const auto& dictionary, const auto& key) {
//! auto translator = dictionary.translator();
//! translator.add(key);
//! return translator.word();
//! }};
//! for (auto input : inputs) {
//! cache.lookup(
//! std::move(input),
//! [](auto input_) { return myExpensiveOperation(std::move(input_)); },
//! [](const auto& result) { std::cout << result << std::endl; });
//! }
//! \endcode
//!
//! IMPLEMETATION:
//! \see compressed_lfu_cache_detail::CCompressedLfuCache.
template<typename KEY, typename VALUE, std::size_t COMPRESSED_KEY_BITS = 128>
class CCompressedLfuCache final
: public compressed_lfu_cache_detail::CCompressedLfuCache<KEY, VALUE, COMPRESSED_KEY_BITS> {
public:
using TBase = compressed_lfu_cache_detail::CCompressedLfuCache<KEY, VALUE, COMPRESSED_KEY_BITS>;
using TDictionary = typename TBase::TDictionary;
using TCompressKey = typename TBase::TCompressKey;
public:
//! \param[in] maximumMemory The maximum memory the cache will consume in bytes.
//! \param[in] compressKey Compresses the key. \see CCompressedDictionary for details.
CCompressedLfuCache(std::size_t maximumMemory, TCompressKey compressKey)
: TBase{maximumMemory, compressKey} {}
private:
using TReadFunc = typename TBase::TReadFunc;
using TWriteFunc = typename TBase::TWriteFunc;
private:
bool guardRead(bool, TReadFunc func) const override { return func(); }
bool guardWrite(bool, TWriteFunc func) override {
func();
return true;
}
bool updatesCanTimeOut() const override { return false; }
};
}
}
#endif // INCLUDED_ml_core_CCompressedLfuCache_h