cachelib/experimental/objcache/ObjectCache.h (405 lines of code) (raw):

/* * Copyright (c) Facebook, Inc. and its affiliates. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ #pragma once #include <scoped_allocator> #include "cachelib/common/PeriodicWorker.h" #include "cachelib/common/Serialization.h" #include "cachelib/experimental/objcache/Allocator.h" #pragma GCC diagnostic push #pragma GCC diagnostic ignored "-Wconversion" #include "cachelib/experimental/objcache/gen-cpp2/ObjectCachePersistence_types.h" #pragma GCC diagnostic pop namespace facebook { namespace cachelib { namespace detail { // Accessor to expose unmarkNascent() from a item handle template <typename ItemHandle2> void objcacheUnmarkNascent(const ItemHandle2& hdl) { hdl.unmarkNascent(); } } // namespace detail namespace objcache { // Figure out how many bytes we need at the minimum for an object in cache template <typename T> inline uint32_t getTypeSize() { return static_cast<uint32_t>(sizeof(T)); } // Convert an item to a type, starting from after the allocator's metadata template <typename T, typename AllocatorResource> inline T* getType(void* mem) { return reinterpret_cast<T*>( AllocatorResource::getReservedStorage(mem, std::alignment_of<T>())); } // Object handle for an object in cache. This handle owns the resource backing // an object cached in cachelib cache. User should only access an object via // an object handle, or a shared_ptr converted from object handle. template <typename T, typename CacheDescriptor, typename AllocatorResource> class CacheObjectHandle { public: using ItemHandle = typename CacheDescriptor::ItemHandle; CacheObjectHandle() = default; ~CacheObjectHandle() = default; explicit CacheObjectHandle(ItemHandle handle) : handle_{std::move(handle)} { if (handle_) { if (*reinterpret_cast<uint16_t*>(handle_->getMemory()) != 0xbeef) { throw std::invalid_argument(folly::sformat( "This item does not have an allocator associated. Key: {}", handle_->getKey())); } ptr_ = getType<T, AllocatorResource>(handle_->getMemory()); } } CacheObjectHandle(const CacheObjectHandle&) = delete; CacheObjectHandle& operator=(const CacheObjectHandle&) = delete; CacheObjectHandle(CacheObjectHandle&& other) { ptr_ = other.ptr_; handle_ = std::move(other.handle_); other.ptr_ = nullptr; } CacheObjectHandle& operator=(CacheObjectHandle&& other) { if (this != &other) { new (this) CacheObjectHandle(std::move(other)); } return *this; } explicit operator bool() const noexcept { return get() != nullptr; } T* operator->() const noexcept { return get(); } T& operator*() const noexcept { return *get(); } T* get() const noexcept { return ptr_; } // View item handle. This is useful if user wants to work with the // underlying CacheAllocator directly with an ItemHandle const ItemHandle& viewItemHandle() const noexcept { return handle_; } // Moves ownership into a regular item handle. ItemHandle releaseItemHandle() && { // Reset "pointer" since we're converting this into a regular item handle ptr_ = nullptr; return std::move(handle_); } // Moves ownership into a shared_ptr. This is irreversible. // TODO: in the future, we should consider if we want to allow user to // convert shared_ptr back into an ObjectHandle via get_deleter(). // https://en.cppreference.com/w/cpp/memory/shared_ptr/get_deleter std::shared_ptr<T> toSharedPtr() && { CacheObjectHandle objHandle{std::move(*this)}; auto* t = objHandle.get(); return std::shared_ptr<T>( t, [objHandle = std::move(objHandle)](T*) mutable { std::move(objHandle).releaseItemHandle().reset(); }); } private: // Handle to ensure the object remains valid ItemHandle handle_{}; T* ptr_{}; }; template <typename ObjectCache> class ObjectCacheCompactor { public: explicit ObjectCacheCompactor( ObjectCache& objcache, std::unique_ptr<typename ObjectCache::CompactionSyncObj> syncObj) : objcache_{objcache}, syncObj_{std::move(syncObj)} {} template <typename T> void compact(folly::StringPiece key, void* unalignedMem) const { auto poolId = objcache_.getCacheAlloc().getAllocInfo(unalignedMem).poolId; auto* obj = getType<T, typename ObjectCache::AllocatorResource>(unalignedMem); auto newObj = objcache_.template createCompact<T>(poolId, key, *obj); objcache_.insertOrReplace(newObj); } private: // Marking this mutable as we will use non-const method in object cache but // the compactor object itself should be kept as const. ObjectCache& objcache_; std::unique_ptr<typename ObjectCache::CompactionSyncObj> syncObj_; }; template <typename ObjectCache> class ObjectCacheConfig { public: using CacheAllocatorConfig = typename ObjectCache::CacheAlloc::Config; ObjectCacheConfig& setCacheAllocatorConfig(CacheAllocatorConfig config) { cacheAllocatorConfig_ = config; return *this; } CacheAllocatorConfig& getCacheAllocatorConfig() { // User can mutate certain fields in cache allocator config return cacheAllocatorConfig_; } const CacheAllocatorConfig& getCacheAllocatorConfig() const { return cacheAllocatorConfig_; } // If objects have non-trivial destructor semantics, then it is recommended // user should create destructor callback, so cachelib will destroy objects // properly when they are evicted/removed from cache. For example, if the // objects contain any objects allocated on heap, or has any side effects // in its destructor logic, then they should have a destructor callback. // // Example of specifying a destructor callback // auto dcb = [](folly::StringPiece key, void* unalignedMem) { // if (key.startsWith("map_type_")) { // getType<MyMapType>(unalignedMem)->~MyMapType(); // } else if (key.startsWith("my_other_type_")) { // getType<MyOtherType>(unalignedMem)->~MyOtherType(); // } // }; // config.setDestructorCallback(dcb); using DestructorCallback = typename ObjectCache::DestructorCallback; ObjectCacheConfig& setDestructorCallback(DestructorCallback destructorCb) { destructorCb_ = std::move(destructorCb); return *this; } const DestructorCallback& getDestructorCallback() const { return destructorCb_; } // Compaction callback is an asynchrnous mechancism to copy the objects // and compact them in memory footprint throughout the process. This is // useful if user frequently mutates the cache objects. If the objects // are rarely mutated (mostly a read workload), then there's no need to // enable compaction. Compaction also comes with a cpu cost, as we have // to walk the cache and check each object for compaction threshold, and // compact eligible objects. The faster we compact, the more expensive // it will be for CPU. The compaction speed is tunable. Also note that // cachelib will never use more than one full core for compaction. // // Example of specifying a compaction callback // auto ccb = [](folly::StringPiece key, // void* unalignedMem, // const Compactor& compactor) { // if (key.startsWith("map_type_")) { // compactor.compact<MyMapType>(key, unalignedMem); // } else if (key.startsWith("my_other_type_")) { // compactor.compact<MyOtherType>(key, unalignedMem); // } // }; // struct UserCompactionSyncObj : public CompactionSyncObj { // UserCompactionSyncObj(UserLock& lock) : l{lock} {} // std::lock_guard<UserLock> l; // }; // auto compactionSync = [] (folly::StringPiece key) { // return std::unique_ptr<CompactionSyncObj>( // new UserCompactionSyncObj{getLock(key)}); // }; // config.enableCompaction(ccb, compactionSync); using CompactionCallback = typename ObjectCache::CompactionCallback; using CompactionSync = typename ObjectCache::CompactionSync; ObjectCacheConfig& enableCompaction( CompactionCallback compactionCb, CompactionSync compactionSync, std::chrono::milliseconds compactionWork = std::chrono::milliseconds{5}, std::chrono::milliseconds compactionSleep = std::chrono::milliseconds{ 45}) { compactionCb_ = std::move(compactionCb); compactionSync_ = std::move(compactionSync); compactionWork_ = compactionWork; compactionSleep_ = compactionSleep; return *this; } const CompactionCallback& getCompactionCallback() const { return compactionCb_; } const CompactionSync& getCompactionSync() const { return compactionSync_; } const std::chrono::milliseconds getCompactionWorkTime() const { return compactionWork_; } const std::chrono::milliseconds getCompactionSleepTime() const { return compactionSleep_; } // TODO: add comments for persistence using SerializationCallback = typename ObjectCache::SerializationCallback; using DeserializationCallback = typename ObjectCache::DeserializationCallback; ObjectCacheConfig& enablePersistence(SerializationCallback scb, DeserializationCallback dcb) { serializationCallback_ = std::move(scb); deserializationCallback_ = std::move(dcb); return *this; } const SerializationCallback& getSerializationCallback() const { return serializationCallback_; } const DeserializationCallback& getDeserializationCallback() const { return deserializationCallback_; } private: CacheAllocatorConfig cacheAllocatorConfig_; DestructorCallback destructorCb_; CompactionCallback compactionCb_; CompactionSync compactionSync_; std::chrono::milliseconds compactionWork_; std::chrono::milliseconds compactionSleep_; SerializationCallback serializationCallback_; DeserializationCallback deserializationCallback_; }; struct ObjectCacheStats { struct StatsAggregate { uint64_t compactions; }; StatsAggregate getAggregate() { StatsAggregate agg; agg.compactions = compactions.get(); return agg; } AtomicCounter compactions; }; // TODO: Allow user to specify any compatible allocator resource // Today we force a user to specifiy which allocator resource to // use at compile-time. We don't actually need to impose this // restriction since our Allocator is designed to allow different // AllocatorResource to be interchanged as long as they're compatible // with one another. So this restriction should be loosened up. /* An example of using ObjectCache is as following: using LruObjectCache = ObjectCache<CacheDescriptor<LruAllocator>, MonotonicBufferResource<CacheDescriptor<LruAllocator>>>; auto cacheAllocatorPtr = myMethodToCreateCacheAllocator(...); LruObjectCache objcache{cacheAllocatorPtr, ownsCache: true | false}; using Vector = std::vector<int, LruObjectCache::Alloc<int>>; // Create a new vector in cache auto vec = objcache.create<Vector>(poolId, "my obj"); // Insert it objcache.insertOrReplace(vec); // Find it auto vec2 = objcache.find<Vector>("my obj"); // Access it for (int i = 0; i < 10; i++) { vec2->push_back(i); } */ template <typename CacheDescriptor, typename AllocatorRes> class ObjectCache { public: using AllocatorResource = AllocatorRes; using Config = ObjectCacheConfig<ObjectCache>; using CacheAlloc = typename CacheDescriptor::Cache; using Item = typename CacheDescriptor::Item; using ItemHandle = typename CacheDescriptor::ItemHandle; // CompactionSyncObj is an object that holds exclusive access to an item. // Any class that implements CompactionSyncObj can hold a "lock" or any other // synchronization primitive over an item. using CompactionSyncObj = typename CacheAlloc::SyncObj; // CompactionSync should return a synchronization object given a key. On // failure, return an valid sync obj that returns `isValid() == false`. using CompactionSync = std::function<std::unique_ptr<CompactionSyncObj>(folly::StringPiece)>; // Destroy an object when it is evicted/removed from cache. This is an // object's destructor. Similarly, this cannot throw and cachelib will // crash as a result of any exception in this callback. using DestructorCallback = std::function<void(folly::StringPiece key, void* unalignedMem, const typename CacheAlloc::RemoveCbData& data)>; // Compactor is a helper class that exposes only the logic necessary to // compact objects. using Compactor = ObjectCacheCompactor<ObjectCache>; // CompactionCallback is a user-supplied callback that compacts object, // refer to Config::enableCompaction() for how to use this. This function // should not throw. Failure of compaction must not produce any side-effect // to the object we're trying to compact. using CompactionCallback = std::function<void( folly::StringPiece key, void* unalignedMem, const Compactor& cache)>; // SerializationCallback takes an object and serialize it to an iobuf. On // success, return a valid iobuf. Otherwise, return nullptr. using SerializationCallback = std::function<std::unique_ptr<folly::IOBuf>( folly::StringPiece key, void* unalignedMem)>; // DeserializationCallback takes an iobuf and deserialize it back into an // object. On success, return a valid item handle. Otherwise, return an empty // one. User can only create an object via "cache". Other operations are NOT // allowed. // // TODO: replace "cache" with a "CacheCreator" helper, and update comments using DeserializationCallback = std::function<typename ObjectCache::ItemHandle(PoolId poolId, folly::StringPiece key, folly::StringPiece payload, ObjectCache& cache)>; template <typename T> using ObjectHandle = CacheObjectHandle<T, CacheDescriptor, AllocatorResource>; template <typename T> using Alloc = std::scoped_allocator_adaptor<Allocator<T, AllocatorResource>>; // Convert an item handle to an object handle template <typename T> static ObjectHandle<T> toObjectHandle(ItemHandle handle) { return ObjectHandle<T>{std::move(handle)}; } // Convert an item handle to a shared_ptr handle. Note that this incurs // a heap allocation to create the control block for shared_ptr. template <typename T> static std::shared_ptr<T> toSharedPtr(ItemHandle handle) { return ObjectHandle<T>{std::move(handle)}.toSharedPtr(); } // TODO: add shared-mem constructors // Creates an object cache that owns the underlying cache // @param config config to create cache allocator managed by ObjectCache explicit ObjectCache(Config config) : ObjectCache(createCache(config), true) { serializationCallback_ = config.getSerializationCallback(); deserializationCallback_ = config.getDeserializationCallback(); if (config.getCompactionCallback()) { compactionWorker_ = std::make_unique<CompactionWorker>(*this, config.getCompactionCallback(), config.getCompactionSync(), config.getCompactionWorkTime(), config.getCompactionSleepTime()); compactionWorker_->start(std::chrono::seconds{1}); } } // TODO: pass in the configs for compaction, destructor, and persistence // // Creates an object cache. This should be used when user has an existing // CacheAllocator instance and is the process of migrating to ObjectCache. // @param cache pointer to the cache allocator // @param ownsCache whether or not this class should own the underlying // cache allocator ObjectCache(CacheAlloc* cache, bool ownsCache) : cache_{cache}, ownsCache_{ownsCache} {} ~ObjectCache() { if (compactionWorker_) { compactionWorker_->stop(); } if (ownsCache_) { delete cache_; } } // Get the underlying CacheAllocator CacheAlloc& getCacheAlloc() { return *cache_; } void persist(RecordWriter& rw) { XDCHECK(serializationCallback_); for (auto it = cache_->begin(); it != cache_->end(); ++it) { auto iobuf = serializationCallback_(it->getKey(), it->getMemory()); if (!iobuf) { XLOG(ERR) << "Failed to serialize for key: " << it->getKey(); continue; } serialization::Item item; item.poolId().value() = cache_->getAllocInfo(it->getMemory()).poolId; // TODO: we need to actually recover creation and persistence as well // we need to modify the create allocator resource logic to allow // us to pass in creation and expiry times. item.creationTime().value() = it->getCreationTime(); item.expiryTime().value() = it->getExpiryTime(); item.key().value() = it->getKey().str(); item.payload().value().resize(iobuf->length()); std::memcpy( item.payload().value().data(), iobuf->data(), iobuf->length()); rw.writeRecord(Serializer::serializeToIOBuf(item)); } } void recover(RecordReader& rr) { XDCHECK(deserializationCallback_); while (!rr.isEnd()) { auto iobuf = rr.readRecord(); XDCHECK(iobuf); Deserializer deserializer(iobuf->data(), iobuf->data() + iobuf->length()); auto item = deserializer.deserialize<serialization::Item>(); // TODO: support creationTime and expiryTime auto hdl = deserializationCallback_(item.poolId().value(), item.key().value(), item.payload().value(), *this); if (!hdl) { XLOG(ERR) << "Failed to deserialize for key: " << item.key().value(); continue; } cache_->insertOrReplace(hdl); } } // Create a new object backed by cachelib-memory. This behaves similar to // std::make_unique. User can pass in any arguments that T's constructor // can take. Note this API only creates a new object but does not insert it // into cache. User must call insertOrReplace to make this object visible. // // @param poolId Cache pool this object will be allocated from. // @param key Key associated with the object // @param args... Arguments for T's constructor // @return a handle to an object // @throw ObjectCacheAllocationError on allocation error // Any exceptions from within T's constructor template <typename T, typename... Args> ObjectHandle<T> create(PoolId poolId, folly::StringPiece key, Args&&... args) { // TODO: Allow user to specify any compatible allocator resource auto [handle, mbr] = createMonotonicBufferResource<AllocatorResource>( *cache_, poolId, key, getTypeSize<T>() /* reserve minimum space for this object */, 0 /* additional bytes for storage */, std::alignment_of<T>()); new (getType<T, AllocatorResource>(handle->getMemory())) T(std::forward<Args>(args)..., Alloc<char>{mbr}); // We explicitly unmark this handle as nascent so we will trigger the // destructor associated with this item properly in the remove callback detail::objcacheUnmarkNascent(handle); return ObjectHandle<T>{std::move(handle)}; } // Compact an object by copying its content onto a single item. This is // only valid if the oldItem is already managed by cachelib. // TODO: is there a better way to design this API? This easily crashes // if user passes in a non-cachelib managed object. // TODO: document params template <typename T> ObjectHandle<T> createCompact(PoolId poolId, folly::StringPiece key, const T& oldObject) { // TODO: handle when this is bigger than 4MB const uint32_t usedBytes = oldObject.get_allocator() .getAllocatorResource() .viewMetadata() ->usedBytes; // TODO: Allow user to specify any compatible allocator resource auto [handle, mbr] = createMonotonicBufferResource<AllocatorResource>( *cache_, poolId, key, getTypeSize<T>() /* reserve minimum space for this object */, usedBytes /* additional bytes for storage */, std::alignment_of<T>()); new (getType<T, AllocatorResource>(handle->getMemory())) T(oldObject, Alloc<char>{mbr}); // We explicitly unmark this handle as nascent so we will trigger the // destructor associated with this item properly in the remove callback detail::objcacheUnmarkNascent(handle); return ObjectHandle<T>{std::move(handle)}; } // Look up an object in cache // @param key Key associated with the object // @param mode Access mode: kRead or kWrite // @return a handle to the object; nullptr if not found. template <typename T> ObjectHandle<T> find(folly::StringPiece key, AccessMode mode = AccessMode::kRead) { auto handle = cache_->findImpl(key, mode); return toObjectHandle<T>(std::move(handle)); } // Insert an object into cache // @param handle Handle to the object // @return a handle to an existing object that we replaced if present; // nullptr otherwise. template <typename T> ObjectHandle<T> insertOrReplace(const ObjectHandle<T>& handle) { auto oldHandle = cache_->insertOrReplace(handle.viewItemHandle()); return toObjectHandle<T>(std::move(oldHandle)); } // Remove an object from cache. Note that the object may NOT be destroyed // immediately. The destructor will only be called when the last holder // of a handle to the object drops the handle. // @param key Key associated with the object void remove(folly::StringPiece key) { cache_->remove(key); } // Wake up the compaction thread and trigger a compactin. This is only used // for testing. void triggerCompactionForTesting() { XDCHECK(compactionWorker_); compactionWorker_->wakeUp(); } ObjectCacheStats::StatsAggregate getStats() const { return stats_->getAggregate(); } private: class CompactionWorker : public PeriodicWorker { public: explicit CompactionWorker( ObjectCache& objcache, typename ObjectCache::CompactionCallback compactionCb, typename ObjectCache::CompactionSync compactionSync, std::chrono::milliseconds compactionWork, std::chrono::milliseconds compactionSleep) : objcache_{objcache}, compactionCb_{std::move(compactionCb)}, compactionSync_{std::move(compactionSync)}, compactionWork_{compactionWork}, compactionSleep_{compactionSleep} {} void work() override { util::Throttler::Config config; config.sleepMs = compactionSleep_.count(); config.workMs = compactionWork_.count(); for (auto it = objcache_.getCacheAlloc().begin(config); it != objcache_.getCacheAlloc().end(); ++it) { if (shouldStopWork()) { return; } // Grab compaction sync for exclusive access to the object // while we perform compaction. This is to prevent any user // threads from mutating the object during compaction. std::unique_ptr<CompactionSyncObj> syncObj; if (compactionSync_) { syncObj = compactionSync_(it->getKey()); if (!syncObj->isValid()) { XLOGF(ERR, "Unable to grab compaction sync. Skipping key: {}", it->getKey()); continue; } } compactionCb_(it->getKey(), it->getMemory(), ObjectCacheCompactor{objcache_, std::move(syncObj)}); objcache_.stats_->compactions.inc(); } } private: ObjectCache& objcache_; typename ObjectCache::CompactionCallback compactionCb_; typename ObjectCache::CompactionSync compactionSync_; std::chrono::milliseconds compactionWork_; std::chrono::milliseconds compactionSleep_; }; static CacheAlloc* createCache(Config& config) { // TODO: we should allow user to specify their own remove callback. We can // just wrap it within object cache's own remove callback if (config.getCacheAllocatorConfig().removeCb) { throw std::invalid_argument("No remove callback allowed"); } // TODO: Move callback should trigger an object level compaction if (config.getCacheAllocatorConfig().moveCb) { throw std::invalid_argument("No remove callback allowed"); } if (config.getDestructorCallback()) { config.getCacheAllocatorConfig().setRemoveCallback( [dcb = config.getDestructorCallback()]( const typename CacheAlloc::RemoveCbData& data) { auto key = data.item.getKey(); dcb(key, data.item.getMemory(), data); }); } return new CacheAlloc(config.getCacheAllocatorConfig()); } mutable std::unique_ptr<ObjectCacheStats> stats_{ std::make_unique<ObjectCacheStats>()}; CacheAlloc* cache_{}; bool ownsCache_{false}; std::unique_ptr<CompactionWorker> compactionWorker_; SerializationCallback serializationCallback_; DeserializationCallback deserializationCallback_; }; } // namespace objcache } // namespace cachelib } // namespace facebook