cachelib/compact_cache/CCache-inl.h (465 lines of code) (raw):

/* * Copyright (c) Facebook, Inc. and its affiliates. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ #pragma once /** * This file implements the methods defined in CCache.h */ #include <folly/logging/xlog.h> #include <typeinfo> #include "cachelib/common/Hash.h" /****************************************************************************/ /* CONFIGURATION */ namespace facebook { namespace cachelib { /****************************************************************************/ /* FORWARD DECLARATIONS */ namespace detail { /****************************************************************************/ /* IMPLEMENTATION */ /** * Convenient macro for updating the stats about a particular operation and * returing an error value. * @param op_name Operation for which to update the stats. * @param rv return value: * -1 -> error * 0 -> miss * 1 -> hit */ #define UPDATE_STATS_AND_RETURN(op_name, rv) \ do { \ ++stats_.tlStats().op_name; \ switch (rv) { \ case -2: \ /* already incremented stat */ \ return CCacheReturn::TIMEOUT; \ case -1: \ ++stats_.tlStats().op_name##Err; \ return CCacheReturn::ERROR; \ case 0: \ ++stats_.tlStats().op_name##Miss; \ return CCacheReturn::NOTFOUND; \ case 1: \ ++stats_.tlStats().op_name##Hit; \ return CCacheReturn::FOUND; \ default: \ XDCHECK(false); \ } \ } while (0); \ return CCacheReturn::ERROR; /** Function template used to call a remove callback. * The caller must take care of verifying that the callback is valid, * i.e the callback was not default constructed. * * This wrapper is needed so that the compact cache implementation can call the * remove callback in a consistent way (i.e, it does not check for the existence * of a value). The specialization takes care of stripping out the values when * the compact cache does not store values. */ template <typename CC> void callRemoveCb(const typename CC::RemoveCb& cb, typename CC::Key const& key, typename CC::Value const* val, RemoveContext const context, typename std::enable_if<CC::kHasValues>::type* = 0) { cb(key, val, context); } template <typename CC> void callRemoveCb(const typename CC::RemoveCb& cb, typename CC::Key const& key, typename CC::Value const* /*val*/ /* unused */, RemoveContext const context, typename std::enable_if<!CC::kHasValues>::type* = 0) { /* This compact cache does not store values, so the callback provided by the * user does not expect a value to be passed. */ cb(key, context); } /** Function template used to call a replace callback. * The caller must take care of verifying that the callback is valid, * i.e the callback was not default constructed. * * This wrapper is needed so that the compact cache implementation can call the * replace callback in a consistent way (i.e, it does not check for the * existence of a value). The specialization takes care of stripping out the * values when the compact cache does not store values. */ template <typename CC> void callReplaceCb(const typename CC::ReplaceCb& cb, typename CC::Key const& key, typename CC::Value const* old_val, typename CC::Value const* new_val, typename std::enable_if<CC::kHasValues>::type* = 0) { cb(key, old_val, new_val); } template <typename CC> void callReplaceCb(const typename CC::ReplaceCb& cb, typename CC::Key const& key, typename CC::Value const* /*old_val*/ /* unused */, typename CC::Value const* /*new_val*/ /* unused */, typename std::enable_if<!CC::kHasValues>::type* = 0) { /* This compact cache does not store values, so the callback provided by the * user does not expect values to be passed. */ cb(key); } } // namespace detail template <typename C, typename A, typename B> CompactCache<C, A, B>::CompactCache(Allocator& allocator, bool allowPromotions) : CompactCache(allocator, nullptr, nullptr, nullptr, allowPromotions) {} template <typename C, typename A, typename B> CompactCache<C, A, B>::CompactCache(Allocator& allocator, RemoveCb removeCb, ReplaceCb replaceCb, ValidCb validCb, bool allowPromotions) : allocator_(allocator), locks_(10 /* hashpower */, std::make_shared<MurmurHash2>()), removeCb_(removeCb), replaceCb_(replaceCb), validCb_(validCb), bucketsPerChunk_(allocator_.getChunkSize() / sizeof(Bucket)), stats_{}, allowPromotions_(allowPromotions), numChunks_(allocator_.getNumChunks()), pendingNumChunks_(0) { allocator_.attach(this); } template <typename C, typename A, typename B> CompactCache<C, A, B>::~CompactCache() { allocator_.detach(); } /** * Move or purge entries that would change which chunk they hash to based on * the specified old and new numbers of chunks. * * @param oldNumChunks old size of compact cache * @param newNumChunks new size of compact cache * @param op whether to create new entries COPY or delete * old ones DELETE in this call */ template <typename C, typename A, typename B> void CompactCache<C, A, B>::tableRehash(size_t oldNumChunks, size_t newNumChunks, RehashOperation op) { XDCHECK_LE(newNumChunks, allocator_.getNumChunks()); XDCHECK_GT(newNumChunks, 0u); XDCHECK_GT(oldNumChunks, 0u); /* Loop through all entries in all buckets of the hash table. */ for (size_t n = 0; n < oldNumChunks; n++) { Bucket* table_chunk = reinterpret_cast<Bucket*>(allocator_.getChunk(n)); for (size_t i = 0; i < bucketsPerChunk_; i++) { Bucket* bucket = &table_chunk[i]; auto lock = locks_.lockExclusive(bucket); const size_t capacity = BucketDescriptor::nEntriesCapacity(*bucket); /* When expanding the cache (newNumChunks > oldNumChunks) move * the entire capacity elements over. * When shrinking cache to some fraction of its former size, * move that fraction of the to-be-deleted chunks in order to * maintain a non-zero cache lifetime throughout */ const size_t max_move = std::min( std::max((newNumChunks * capacity) / oldNumChunks, (size_t)1), capacity); size_t moved = 0; EntryHandle entry = BucketDescriptor::first(&table_chunk[i]); while (entry) { const bool valid_entry = !validCb_ || validCb_(entry.key()); auto remove_context = valid_entry ? RemoveContext::kEviction : RemoveContext::kNormal; Bucket* new_chunk = tableFindChunk(newNumChunks, entry.key()); if (new_chunk == table_chunk) { entry.next(); continue; } /* Moving takes two forms. If newNumChunks is bigger, we move * everything to its new place in the newly allocated chunks. * If smaller, we need to preserve read-after-write so we move * a small initial fraction of the newly lost buckets to their * new home, putting them at the head of the LRU. * * This occurs in two stages; first we insert the to-be-moved * entries into their new home and call the removal callbacks * for invalid or excess ones. * * Second, after reads have moved, we delete the old entries * This is done in a new cohort to ensure reads never see * a temporarily disappeared entry in cache. */ if (op == RehashOperation::COPY) { if (valid_entry && moved < max_move) { /* Add new entry. Offset is the same, so no need to * re-compute that hash */ Bucket* newBucket = &new_chunk[i]; // lock ordering is established by the hash function; // old hash -> new hash // However there can be arbitrary hash collisions, so // don't relock the same lock (we don't use recursive // locks in general) bool sameLock = locks_.isSameLock(newBucket, bucket); auto higher_lock = sameLock ? CCWriteHolder() : locks_.lockExclusive(newBucket); if (kHasValues) { if (kValuesFixedSize) { bucketSet(newBucket, entry.key(), entry.val()); } else { bucketSet(newBucket, entry.key(), entry.val(), entry.size()); } } else { bucketSet(newBucket, entry.key()); } moved++; } else { // not moving this one, either invalid or we're full // call evict (or delete if invalid) callback if (removeCb_) { detail::callRemoveCb<SelfType>( removeCb_, entry.key(), entry.val(), remove_context); } } entry.next(); } else { // on the second pass we run the deletes, to avoid // read requests seeing no data in the interim // actually delete here BucketDescriptor::del(entry); // no entry.next() call as del advances ptr } } XDCHECK(newNumChunks <= oldNumChunks || !entry); } } } template <typename C, typename A, typename B> void CompactCache<C, A, B>::resize() { const size_t oldNumChunks = numChunks_; const size_t configuredSize = allocator_.getConfiguredSize(); const size_t numChunksWanted = configuredSize / allocator_.getChunkSize(); /* No change in size */ if (oldNumChunks == numChunksWanted) { return; } /* Lock resize operations to prevent more than one from occurring * at a time */ auto lock = folly::SharedMutex::WriteHolder(resizeLock_); size_t newNumChunks = numChunksWanted; if (numChunksWanted > oldNumChunks) { /* Grow the cache. */ newNumChunks = allocator_.resize(); /* Check to see if we were able to get the requested number of chunks */ if (newNumChunks != numChunksWanted) { if (newNumChunks == oldNumChunks) { XLOG(CRITICAL) << "Failed to grow arena. Continuing with old size."; return; } else { XLOG(CRITICAL) << "Failed to grow arena by as much as we wanted."; } } } /* Update the number of chunks so new write requests start hitting both * old and new chunks */ XDCHECK_NE(newNumChunks, oldNumChunks); XDCHECK_EQ(pendingNumChunks_.load(), 0u); /* only bother resharding if not going from/to 0 size */ if (newNumChunks > 0 && oldNumChunks > 0) { pendingNumChunks_ = newNumChunks; /* Ensure all writes are double writing now */ cohort_.switchCohorts(); /* Sweep through the table and find all buckets that should now rehash. * In offline mode, move entries; online we just delete * to avoid invalidate races */ tableRehash(oldNumChunks, newNumChunks, RehashOperation::COPY); // now that rehash happened, we can start reading from the new location // don't yet stop double writes as that ordering (no double write, read // from old location) may not be guaranteed // It likely is fine as those are ordered by a lock which should order // the loads of the num_chunks value properly numChunks_ = newNumChunks; cohort_.switchCohorts(); // now stop double writes and wait for all requests to complete // so that the old chunk locations are totally unused pendingNumChunks_ = 0; cohort_.switchCohorts(); // now go through again and delete stale entries in the old location tableRehash(oldNumChunks, newNumChunks, RehashOperation::DELETE); } else { numChunks_ = newNumChunks; // If we are disabling the compact cache (making the size to 0), we need to // make sure all requests are completed before we can release the chunks cohort_.switchCohorts(); } // free slabs if we have extra if (newNumChunks < oldNumChunks) { allocator_.resize(); } } template <typename C, typename A, typename B> CCacheReturn CompactCache<C, A, B>::set( const Key& key, const std::chrono::microseconds& timeout, const Value* val, size_t size) { int rv = callBucketFn( key, Operation::WRITE, timeout, &SelfType::bucketSet, val, size); UPDATE_STATS_AND_RETURN(set, rv); } template <typename C, typename A, typename B> CCacheReturn CompactCache<C, A, B>::del( const Key& key, const std::chrono::microseconds& timeout, Value* val, size_t* size) { int rv = callBucketFn( key, Operation::WRITE, timeout, &SelfType::bucketDel, val, size); UPDATE_STATS_AND_RETURN(del, rv); } template <typename C, typename A, typename B> CCacheReturn CompactCache<C, A, B>::get( const Key& key, const std::chrono::microseconds& timeout, Value* val, size_t* size, bool shouldPromote) { int rv = callBucketFn(key, Operation::READ, timeout, &SelfType::bucketGet, val, size, shouldPromote); UPDATE_STATS_AND_RETURN(get, rv); } template <typename C, typename A, typename B> CCacheReturn CompactCache<C, A, B>::exists( const Key& key, const std::chrono::microseconds& timeout) { int rv = callBucketFn(key, Operation::READ, timeout, &SelfType::bucketGet, nullptr /* val */, nullptr /* size */, false /* shouldPromote */); UPDATE_STATS_AND_RETURN(get, rv); } template <typename C, typename A, typename B> bool CompactCache<C, A, B>::purge(const PurgeFilter& shouldPurge) { auto bucketCallback = [&](Bucket* bucket) { EntryHandle entry = BucketDescriptor::first(bucket); while (entry) { auto rv = shouldPurge(entry.key(), kHasValues ? entry.val() : nullptr); if (rv == PurgeFilterResult::ABORT) { return false; } else if (rv == PurgeFilterResult::SKIP) { entry.next(); continue; } if (removeCb_) { detail::callRemoveCb<SelfType>( removeCb_, entry.key(), entry.val(), RemoveContext::kNormal); } BucketDescriptor::del(entry); /* No need to call entry.next() here because * BucketDescriptor::del advances the entry. */ } return true; }; const bool rv = forEachBucket(bucketCallback); if (rv) { ++stats_.tlStats().purgeSuccess; } else { ++stats_.tlStats().purgeErr; } return rv; } /****************************************************************************/ /* IMPLEMENTATION */ template <typename C, typename A, typename B> template <typename Fn, typename... Args> int CompactCache<C, A, B>::callBucketFn( const Key& key, Operation op, const std::chrono::microseconds& timeout, Fn f, Args... args) { if (numChunks_ == 0) { return -1; } /* 1) Increase the refcount of the current cohort. */ Cohort::Token tok = cohort_.incrActiveReqs(); /* 2) Find the hash table bucket for the key. */ Bucket* bucket = tableFindBucket(key); /* 3) Lock the bucket. Immutable bucket is a parameter * regarding whether we're allowed to modify the bucket in any way, * meaning we take an exclusive lock, or not, meaning we take a * shared lock for reads without promotion. We may need to promote * in a second pass in the latter case. */ BucketReturn rv; bool immutable_bucket = (op == Operation::READ); /* 4) Call the request handler. */ if (immutable_bucket) { auto lock = locks_.lockShared(timeout, bucket); if (!lock.locked()) { XDCHECK(timeout > std::chrono::microseconds::zero()); ++stats_.tlStats().lockTimeout; return -2; } rv = (this->*f)(bucket, key, args...); } else { auto lock = locks_.lockExclusive(timeout, bucket); if (!lock.locked()) { XDCHECK(timeout > std::chrono::microseconds::zero()); ++stats_.tlStats().lockTimeout; return -2; } rv = (this->*f)(bucket, key, args...); } /* 5.5) Promote if necessary from a read operation */ if (UNLIKELY(rv == BucketReturn::PROMOTE)) { XDCHECK(immutable_bucket); XDCHECK_EQ(op, Operation::READ); rv = BucketReturn::FOUND; if (allowPromotions_) { auto lock = locks_.lockExclusive(timeout, bucket); if (!lock.locked()) { XDCHECK(timeout > std::chrono::microseconds::zero()); ++stats_.tlStats().promoteTimeout; } else { this->bucketPromote(bucket, key); } } } XDCHECK(rv != BucketReturn::PROMOTE); /* 6) Do the operation on the new location if necessary * Note that although we release the old lock, the order is very * important. The rehasher will be doing lock old -> read old -> * lock new -> write new -> unlock both * So, we need to make sure that our old update either happens * before the rehasher reads, or if it happens after then the new update * also happens after. IE new update has to be second. */ Bucket* bucketDbl = (op == Operation::WRITE) ? tableFindDblWriteBucket(key) : nullptr; if (bucketDbl != nullptr) { auto lockDbl = locks_.lockExclusive(bucketDbl); if ((this->*f)(bucketDbl, key, args...) == BucketReturn::ERROR) { rv = BucketReturn::ERROR; } } XDCHECK_NE(toInt(rv), 2); return toInt(rv); } template <typename C, typename A, typename B> typename CompactCache<C, A, B>::Bucket* CompactCache<C, A, B>::tableFindChunk( size_t numChunks, const Key& key) { XDCHECK_GT(numChunks, 0u); XDCHECK_LE(numChunks, allocator_.getNumChunks()); /* furcHash is well behaved; numChunks <= 1 returns 0 for chunkIndex */ auto chunkIndex = facebook::cachelib::furcHash( reinterpret_cast<const void*>(&key), sizeof(key), numChunks); return reinterpret_cast<Bucket*>(allocator_.getChunk(chunkIndex)); } template <typename C, typename A, typename B> typename CompactCache<C, A, B>::Bucket* CompactCache<C, A, B>::tableFindBucket( const Key& key) { Bucket* chunk = tableFindChunk(numChunks_, key); uint32_t hv = MurmurHash2()(reinterpret_cast<const void*>(&key), sizeof(key)); return &chunk[hv % bucketsPerChunk_]; } template <typename C, typename A, typename B> typename CompactCache<C, A, B>::Bucket* CompactCache<C, A, B>::tableFindDblWriteBucket(const Key& key) { // this var is volatile and can be 0, which will cause // asserts later; avoid this case size_t pending = pendingNumChunks_; if (pending == 0) { return nullptr; } Bucket* chunk = tableFindChunk(numChunks_, key); Bucket* chunkNew = tableFindChunk(pending, key); if (chunk == chunkNew) { return nullptr; } uint32_t hv = MurmurHash2()(reinterpret_cast<const void*>(&key), sizeof(key)); return &chunkNew[hv % bucketsPerChunk_]; } template <typename C, typename A, typename B> void CompactCache<C, A, B>::onEntryEvicted(const EntryHandle& handle) { ++stats_.tlStats().evictions; XDCHECK(handle); if (removeCb_) { RemoveContext c = validCb_ && !validCb_(handle.key()) ? RemoveContext::kNormal : RemoveContext::kEviction; detail::callRemoveCb<SelfType>(removeCb_, handle.key(), handle.val(), c); } } template <typename C, typename A, typename B> typename CompactCache<C, A, B>::BucketReturn CompactCache<C, A, B>::bucketSet( Bucket* bucket, const Key& key, const Value* val, size_t size) { if ((kHasValues && (val == nullptr)) || (kValuesFixedSize && size != 0 && size != sizeof(Value))) { // val should not be null if kHasValues is true // size should be 0 if kValuesFixedSize is true return BucketReturn::ERROR; } using namespace std::placeholders; auto evictionCallback = std::bind(&SelfType::onEntryEvicted, this, std::placeholders::_1); /* Look for an existing entry to be updated. */ EntryHandle entry = bucketFind(bucket, key); if (!entry) { int rv = BucketDescriptor::insert(bucket, key, val, size, evictionCallback); if (rv == -1) { /* An error occured when inserting. */ XLOG(ERR) << "Unable to insert entry in compact cache."; return BucketReturn::ERROR; } /* This is a miss meaning we inserted something new */ return BucketReturn::NOTFOUND; } else { if (replaceCb_) { detail::callReplaceCb<SelfType>(replaceCb_, key, entry.val(), val); } /* Update the value of the already existing entry. */ BucketDescriptor::updateVal(entry, val, size, evictionCallback); /* This is a hit. */ return BucketReturn::FOUND; } } template <typename C, typename A, typename B> typename CompactCache<C, A, B>::BucketReturn CompactCache<C, A, B>::bucketDel( Bucket* bucket, const Key& key, Value* val, size_t* size) { EntryHandle entry = bucketFind(bucket, key); if (!entry) { return BucketReturn::NOTFOUND; } if (kHasValues && val != nullptr) { BucketDescriptor::copyVal(val, size, entry); } if (removeCb_) { detail::callRemoveCb<SelfType>( removeCb_, key, entry.val(), RemoveContext::kNormal); } BucketDescriptor::del(entry); return BucketReturn::FOUND; } template <typename C, typename A, typename B> typename CompactCache<C, A, B>::BucketReturn CompactCache<C, A, B>::bucketGet( Bucket* bucket, const Key& key, Value* val, size_t* size, bool shouldPromote) { EntryHandle entry = bucketFind(bucket, key); if (!entry) { return BucketReturn::NOTFOUND; } if (kHasValues && val != nullptr) { BucketDescriptor::copyVal(val, size, entry); } if (entry.isBucketTail()) { ++stats_.tlStats().tailHits; } return shouldPromote && BucketDescriptor::needs_promote(entry) ? BucketReturn::PROMOTE : BucketReturn::FOUND; } template <typename C, typename A, typename B> typename CompactCache<C, A, B>::BucketReturn CompactCache<C, A, B>::bucketPromote(Bucket* bucket, const Key& key) { EntryHandle entry = bucketFind(bucket, key); if (UNLIKELY(!entry)) { return BucketReturn::NOTFOUND; } if (BucketDescriptor::needs_promote(entry)) { BucketDescriptor::promote(entry); } return BucketReturn::FOUND; } /** This iterates on all the entries in the bucket and compare their keys * with the key until a match is found. */ template <typename C, typename A, typename B> typename CompactCache<C, A, B>::EntryHandle CompactCache<C, A, B>::bucketFind( Bucket* bucket, const Key& key) { for (EntryHandle handle = BucketDescriptor::first(bucket); handle; handle.next()) { if (handle.key() == key) { /* Entry found. */ return handle; } } /* Entry not found. */ return EntryHandle(); } template <typename C, typename A, typename B> bool CompactCache<C, A, B>::forEachBucket(const BucketCallBack& cb) { auto lock = folly::SharedMutex::ReadHolder(resizeLock_); // this obtains a resize lock so it cannot be occuring during an actual // resize; assert that XDCHECK_EQ(pendingNumChunks_.load(), 0u); /* Loop through all buckets in the table. */ for (size_t n = 0; n < numChunks_; n++) { Bucket* tableChunk = reinterpret_cast<Bucket*>(allocator_.getChunk(n)); for (size_t i = 0; i < bucketsPerChunk_; i++) { /* Lock the bucket. */ Bucket* bucket = &tableChunk[i]; auto bucketLock = locks_.lockExclusive(bucket); if (!cb(bucket)) { return false; } } } return true; } } // namespace cachelib } // namespace facebook