cachelib/allocator/ChainedHashTable-inl.h (501 lines of code) (raw):
/*
* Copyright (c) Facebook, Inc. and its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#pragma once
#include <stdexcept>
#pragma GCC diagnostic push
#pragma GCC diagnostic ignored "-Wconversion"
#include <folly/Format.h>
#include <folly/Range.h>
#pragma GCC diagnostic pop
namespace facebook {
namespace cachelib {
template <typename T, typename ChainedHashTable::Hook<T> T::*HookPtr>
ChainedHashTable::Impl<T, HookPtr>::Impl(size_t numBuckets,
const PtrCompressor& compressor,
const Hasher& hasher)
: numBuckets_(numBuckets),
numBucketsMask_(numBuckets - 1),
compressor_(compressor),
hasher_(hasher) {
if (numBuckets == 0) {
throw std::invalid_argument("Can not have 0 buckets");
}
if (numBuckets & (numBuckets - 1)) {
throw std::invalid_argument("Number of buckets must be a power of two");
}
hashTable_ = std::make_unique<CompressedPtr[]>(numBuckets_);
CompressedPtr* memStart = hashTable_.get();
std::fill(memStart, memStart + numBuckets_, CompressedPtr{});
}
template <typename T, typename ChainedHashTable::Hook<T> T::*HookPtr>
ChainedHashTable::Impl<T, HookPtr>::Impl(size_t numBuckets,
void* memStart,
const PtrCompressor& compressor,
const Hasher& hasher,
bool resetMem)
: numBuckets_(numBuckets),
numBucketsMask_(numBuckets - 1),
hashTable_(static_cast<CompressedPtr*>(memStart)),
restorable_(true),
compressor_(compressor),
hasher_(hasher) {
if (numBuckets == 0) {
throw std::invalid_argument("Can not have 0 buckets");
}
if (numBuckets & (numBuckets - 1)) {
throw std::invalid_argument("Number of buckets must be a power of two");
}
if (resetMem) {
CompressedPtr* memStartBucket = static_cast<CompressedPtr*>(memStart);
std::fill(memStartBucket, memStartBucket + numBuckets_, CompressedPtr{});
}
}
template <typename T, typename ChainedHashTable::Hook<T> T::*HookPtr>
ChainedHashTable::Impl<T, HookPtr>::~Impl() {
if (restorable_) {
hashTable_.release();
}
}
template <typename T, typename ChainedHashTable::Hook<T> T::*HookPtr>
typename ChainedHashTable::Impl<T, HookPtr>::BucketId
ChainedHashTable::Impl<T, HookPtr>::getBucket(
typename T::Key k) const noexcept {
return (*hasher_)(k.data(), k.size()) & numBucketsMask_;
}
template <typename T, typename ChainedHashTable::Hook<T> T::*HookPtr>
bool ChainedHashTable::Impl<T, HookPtr>::insertInBucket(
T& node, BucketId bucket) noexcept {
XDCHECK_LT(bucket, numBuckets_);
const auto existing = findInBucket(node.getKey(), bucket);
if (existing != nullptr) {
// already there
return false;
}
// insert at the head of the bucket
const auto head = hashTable_[bucket];
hashTable_[bucket] = compressor_.compress(&node);
setHashNext(node, head);
return true;
}
template <typename T, typename ChainedHashTable::Hook<T> T::*HookPtr>
T* ChainedHashTable::Impl<T, HookPtr>::insertOrReplaceInBucket(
T& node, BucketId bucket) noexcept {
XDCHECK_LT(bucket, numBuckets_);
// See if we can find the key and the previous node
T* curr = compressor_.unCompress(hashTable_[bucket]);
T* prev = nullptr;
const auto key = node.getKey();
while (curr != nullptr && key != curr->getKey()) {
prev = curr;
curr = getHashNext(*curr);
}
// insert if the key doesn't exist
if (!curr) {
const auto head = hashTable_[bucket];
hashTable_[bucket] = compressor_.compress(&node);
setHashNext(node, head);
return nullptr;
}
// replace
if (prev) {
setHashNext(*prev, &node);
} else {
hashTable_[bucket] = compressor_.compress(&node);
}
setHashNext(node, getHashNext(*curr));
return curr;
}
template <typename T, typename ChainedHashTable::Hook<T> T::*HookPtr>
void ChainedHashTable::Impl<T, HookPtr>::removeFromBucket(
T& node, BucketId bucket) noexcept {
// node must be present in hashtable.
XDCHECK_EQ(reinterpret_cast<uintptr_t>(findInBucket(node.getKey(), bucket)),
reinterpret_cast<uintptr_t>(&node))
<< node.toString();
T* const prev = findPrevInBucket(node, bucket);
if (prev != nullptr) {
setHashNext(*prev, getHashNext(node));
} else {
XDCHECK_EQ(reinterpret_cast<uintptr_t>(&node),
reinterpret_cast<uintptr_t>(
compressor_.unCompress(hashTable_[bucket])));
hashTable_[bucket] = getHashNextCompressed(node);
}
}
template <typename T, typename ChainedHashTable::Hook<T> T::*HookPtr>
T* ChainedHashTable::Impl<T, HookPtr>::findInBucket(
Key key, BucketId bucket) const noexcept {
XDCHECK_LT(bucket, numBuckets_);
T* curr = compressor_.unCompress(hashTable_[bucket]);
while (curr != nullptr && curr->getKey() != key) {
curr = getHashNext(*curr);
}
return curr;
}
template <typename T, typename ChainedHashTable::Hook<T> T::*HookPtr>
T* ChainedHashTable::Impl<T, HookPtr>::findPrevInBucket(
const T& node, BucketId bucket) const noexcept {
XDCHECK_LT(bucket, numBuckets_);
T* curr = compressor_.unCompress(hashTable_[bucket]);
T* prev = nullptr;
const auto key = node.getKey();
while (curr != nullptr && key != curr->getKey()) {
prev = curr;
curr = getHashNext(*curr);
}
// node must be in the hashtable
XDCHECK(curr != nullptr);
return prev;
}
template <typename T, typename ChainedHashTable::Hook<T> T::*HookPtr>
template <typename F>
void ChainedHashTable::Impl<T, HookPtr>::forEachBucketElem(BucketId bucket,
F&& func) const {
XDCHECK_LT(bucket, numBuckets_);
T* curr = compressor_.unCompress(hashTable_[bucket]);
while (curr != nullptr) {
func(curr);
curr = getHashNext(*curr);
}
}
template <typename T, typename ChainedHashTable::Hook<T> T::*HookPtr>
unsigned int ChainedHashTable::Impl<T, HookPtr>::getBucketNumElems(
BucketId bucket) const {
XDCHECK_LT(bucket, numBuckets_);
T* curr = compressor_.unCompress(hashTable_[bucket]);
unsigned int numElems = 0;
while (curr != nullptr) {
++numElems;
curr = getHashNext(*curr);
}
return numElems;
}
// AccessContainer interface
template <typename T,
typename ChainedHashTable::Hook<T> T::*HookPtr,
typename LockT>
ChainedHashTable::Container<T, HookPtr, LockT>::Container(
const serialization::ChainedHashTableObject& object,
const Config& config,
ShmAddr memSegment,
const PtrCompressor& compressor,
HandleMaker hm)
: Container(object,
config,
memSegment.addr,
memSegment.size,
compressor,
std::move(hm)) {}
template <typename T,
typename ChainedHashTable::Hook<T> T::*HookPtr,
typename LockT>
ChainedHashTable::Container<T, HookPtr, LockT>::Container(
const serialization::ChainedHashTableObject& object,
const Config& config,
void* memStart,
size_t nBytes,
const PtrCompressor& compressor,
HandleMaker hm)
: config_{config},
handleMaker_(std::move(hm)),
ht_{config_.getNumBuckets(), memStart, compressor, config_.getHasher(),
false /* resetMem */},
locks_{config_.getLocksPower(), config_.getHasher()},
numKeys_(*object.numKeys()) {
if (config_.getBucketsPower() !=
static_cast<uint32_t>(*object.bucketsPower())) {
throw std::invalid_argument(folly::sformat(
"Hashtable bucket power not compatible. old = {}, new = {}",
*object.bucketsPower(),
config.getBucketsPower()));
}
if (nBytes != ht_.size()) {
throw std::invalid_argument(
folly::sformat("Hashtable size not compatible. old = {}, new = {}",
ht_.size(),
nBytes));
}
// checking hasher magic id not equal to 0 is to ensure it'll be
// a warm roll going from a cachelib without hasher magic id to
// one with a magic id
if (*object.hasherMagicId() != 0 &&
*object.hasherMagicId() != config_.getHasher()->getMagicId()) {
throw std::invalid_argument(folly::sformat(
"Hash object's ID mismatch. expected = {}, actual = {}",
*object.hasherMagicId(), config_.getHasher()->getMagicId()));
}
}
template <typename T,
typename ChainedHashTable::Hook<T> T::*HookPtr,
typename LockT>
typename ChainedHashTable::Container<T, HookPtr, LockT>::DistributionStats
ChainedHashTable::Container<T, HookPtr, LockT>::getDistributionStats() const {
const auto now = util::getCurrentTimeSec();
const uint64_t numKeys = numKeys_;
std::unique_lock<std::mutex> statsLockGuard(cachedStatsLock_);
const auto numKeysDifference = numKeys > cachedStats_.numKeys
? numKeys - cachedStats_.numKeys
: cachedStats_.numKeys - numKeys;
const bool needToRecompute =
(now - cachedStatsUpdateTime_ > 10 * 60 /* seconds */) ||
(cachedStats_.numKeys > 0 &&
(static_cast<double>(numKeysDifference) /
static_cast<double>(cachedStats_.numKeys) >
0.05));
// return the cached value or if someone else is already computing.
if (!needToRecompute || !canRecomputeDistributionStats_) {
return cachedStats_;
}
// record that we are iterating so that we dont cause everyone who
// observes this to recompute
canRecomputeDistributionStats_ = false;
// release the lock.
statsLockGuard.unlock();
// compute the distribution
std::map<unsigned int, uint64_t> distribution;
const auto numBuckets = ht_.getNumBuckets();
for (BucketId currBucket = 0; currBucket < numBuckets; ++currBucket) {
auto l = locks_.lockShared(currBucket);
++distribution[ht_.getBucketNumElems(currBucket)];
}
// acquire lock
statsLockGuard.lock();
cachedStats_.numKeys = numKeys;
cachedStats_.itemDistribution = std::move(distribution);
cachedStats_.numBuckets = ht_.getNumBuckets();
cachedStatsUpdateTime_ = now;
canRecomputeDistributionStats_ = true;
return cachedStats_;
}
template <typename T,
typename ChainedHashTable::Hook<T> T::*HookPtr,
typename LockT>
bool ChainedHashTable::Container<T, HookPtr, LockT>::insert(T& node) noexcept {
if (node.isAccessible()) {
// already in hash table.
return false;
}
const auto bucket = ht_.getBucket(node.getKey());
auto l = locks_.lockExclusive(bucket);
const bool res = ht_.insertInBucket(node, bucket);
if (res) {
node.markAccessible();
numKeys_.fetch_add(1, std::memory_order_relaxed);
}
return res;
}
template <typename T,
typename ChainedHashTable::Hook<T> T::*HookPtr,
typename LockT>
typename T::Handle
ChainedHashTable::Container<T, HookPtr, LockT>::insertOrReplace(T& node) {
if (node.isAccessible()) {
return handleMaker_(nullptr);
}
const auto bucket = ht_.getBucket(node.getKey());
auto l = locks_.lockExclusive(bucket);
T* oldNode = ht_.insertOrReplaceInBucket(node, bucket);
XDCHECK_NE(reinterpret_cast<uintptr_t>(&node),
reinterpret_cast<uintptr_t>(oldNode));
// grab a handle to the old node before we mark it as not being in the hash
// table.
typename T::Handle handle;
try {
handle = handleMaker_(oldNode);
} catch (const std::exception&) {
// put the element back since we failed to grab handle.
ht_.insertOrReplaceInBucket(*oldNode, bucket);
XDCHECK_EQ(
reinterpret_cast<uintptr_t>(ht_.findInBucket(node.getKey(), bucket)),
reinterpret_cast<uintptr_t>(oldNode))
<< oldNode->toString();
throw;
}
node.markAccessible();
if (oldNode) {
oldNode->unmarkAccessible();
} else {
numKeys_.fetch_add(1, std::memory_order_relaxed);
}
return handle;
}
template <typename T,
typename ChainedHashTable::Hook<T> T::*HookPtr,
typename LockT>
bool ChainedHashTable::Container<T, HookPtr, LockT>::replaceIfAccessible(
T& oldNode, T& newNode) noexcept {
return replaceIf(oldNode, newNode, [](T&) { return true; });
}
template <typename T,
typename ChainedHashTable::Hook<T> T::*HookPtr,
typename LockT>
template <typename F>
bool ChainedHashTable::Container<T, HookPtr, LockT>::replaceIf(T& oldNode,
T& newNode,
F&& predicate) {
const auto key = newNode.getKey();
const auto bucket = ht_.getBucket(key);
auto l = locks_.lockExclusive(bucket);
if (oldNode.isAccessible() && predicate(oldNode)) {
ht_.insertOrReplaceInBucket(newNode, bucket);
oldNode.unmarkAccessible();
newNode.markAccessible();
return true;
}
return false;
}
template <typename T,
typename ChainedHashTable::Hook<T> T::*HookPtr,
typename LockT>
bool ChainedHashTable::Container<T, HookPtr, LockT>::remove(T& node) noexcept {
const auto bucket = ht_.getBucket(node.getKey());
auto l = locks_.lockExclusive(bucket);
// check inside the lock to prevent from racing removes
if (!node.isAccessible()) {
return false;
}
ht_.removeFromBucket(node, bucket);
node.unmarkAccessible();
numKeys_.fetch_sub(1, std::memory_order_relaxed);
return true;
}
template <typename T,
typename ChainedHashTable::Hook<T> T::*HookPtr,
typename LockT>
typename T::Handle ChainedHashTable::Container<T, HookPtr, LockT>::removeIf(
T& node, const std::function<bool(const T& node)>& predicate) {
const auto bucket = ht_.getBucket(node.getKey());
auto l = locks_.lockExclusive(bucket);
// check inside the lock to prevent from racing removes
if (node.isAccessible() && predicate(node)) {
// grab the handle before we do any other state change. this ensures that
// if handle maker throws an exception, we leave the item in a consistent
// state.
auto handle = handleMaker_(&node);
ht_.removeFromBucket(node, bucket);
node.unmarkAccessible();
numKeys_.fetch_sub(1, std::memory_order_relaxed);
return handle;
} else {
return handleMaker_(nullptr);
}
}
template <typename T,
typename ChainedHashTable::Hook<T> T::*HookPtr,
typename LockT>
typename T::Handle ChainedHashTable::Container<T, HookPtr, LockT>::find(
Key key) const {
const auto bucket = ht_.getBucket(key);
auto l = locks_.lockShared(bucket);
return handleMaker_(ht_.findInBucket(key, bucket));
}
template <typename T,
typename ChainedHashTable::Hook<T> T::*HookPtr,
typename LockT>
serialization::ChainedHashTableObject
ChainedHashTable::Container<T, HookPtr, LockT>::saveState() const {
if (!ht_.isRestorable()) {
throw std::logic_error(
"hashtable is not restorable since the memory is not managed by user");
}
if (numIterators_ != 0) {
throw std::logic_error(
folly::sformat("There are {} pending iterators", numIterators_.load()));
}
serialization::ChainedHashTableObject object;
*object.bucketsPower() = config_.getBucketsPower();
*object.locksPower() = config_.getLocksPower();
*object.numKeys() = numKeys_;
*object.hasherMagicId() = config_.getHasher()->getMagicId();
return object;
}
template <typename T,
typename ChainedHashTable::Hook<T> T::*HookPtr,
typename LockT>
void ChainedHashTable::Container<T, HookPtr, LockT>::getBucketElems(
BucketId bucket, std::vector<Handle>& handles) const {
handles.clear();
auto l = locks_.lockShared(bucket);
ht_.forEachBucketElem(bucket, [this, &handles](T* e) {
try {
XDCHECK(e);
handles.emplace_back(handleMaker_(e));
} catch (const std::exception&) {
// if we are not able to acquire a handle, skip over them.
}
});
}
// Container's Iterator
// with/without throtter to iterate
template <typename T,
typename ChainedHashTable::Hook<T> T::*HookPtr,
typename LockT>
typename ChainedHashTable::Container<T, HookPtr, LockT>::Iterator&
ChainedHashTable::Container<T, HookPtr, LockT>::Iterator::operator++() {
if (throttler_) {
throttler_->throttle();
}
++curSor_;
if (curSor_ < bucketElems_.size()) {
return *this;
}
++currBucket_;
for (; currBucket_ < container_->config_.getNumBuckets(); ++currBucket_) {
container_->getBucketElems(currBucket_, bucketElems_);
if (!bucketElems_.empty()) {
curSor_ = 0;
return *this;
} else if (throttler_) {
throttler_->throttle();
}
}
// reach the end
bucketElems_.clear();
curSor_ = 0;
return *this;
}
template <typename T,
typename ChainedHashTable::Hook<T> T::*HookPtr,
typename LockT>
T& ChainedHashTable::Container<T, HookPtr, LockT>::Iterator::operator*() {
return *curr();
}
template <typename T,
typename ChainedHashTable::Hook<T> T::*HookPtr,
typename LockT>
ChainedHashTable::Container<T, HookPtr, LockT>::Iterator::Iterator(
Container<T, HookPtr, LockT>& container,
folly::Optional<util::Throttler::Config> throttlerConfig)
: container_(&container) {
if (throttlerConfig) {
throttler_.assign(util::Throttler(*throttlerConfig));
}
++container_->numIterators_;
reset();
}
template <typename T,
typename ChainedHashTable::Hook<T> T::*HookPtr,
typename LockT>
ChainedHashTable::Container<T, HookPtr, LockT>::Iterator::Iterator(
Iterator&& other) noexcept
: container_{other.container_},
currBucket_{other.currBucket_},
curSor_{other.curSor_},
bucketElems_(std::move(other.bucketElems_)) {
// increment the iterator count when we move.
++container_->numIterators_;
}
template <typename T,
typename ChainedHashTable::Hook<T> T::*HookPtr,
typename LockT>
typename ChainedHashTable::Container<T, HookPtr, LockT>::Iterator&
ChainedHashTable::Container<T, HookPtr, LockT>::Iterator::operator=(
Iterator&& other) noexcept {
if (this != &other) {
this->~Iterator();
new (this) Iterator(std::move(other));
}
return *this;
}
template <typename T,
typename ChainedHashTable::Hook<T> T::*HookPtr,
typename LockT>
ChainedHashTable::Container<T, HookPtr, LockT>::Iterator::Iterator(
Container<T, HookPtr, LockT>& container, EndIterT)
: container_(&container), currBucket_{container_->config_.getNumBuckets()} {
// increment the iterator for both the end and begin() types so that the
// destructor can just blindly decrement.
++container_->numIterators_;
XDCHECK_EQ(0u, curSor_);
}
template <typename T,
typename ChainedHashTable::Hook<T> T::*HookPtr,
typename LockT>
typename ChainedHashTable::Container<T, HookPtr, LockT>::Iterator
ChainedHashTable::Container<T, HookPtr, LockT>::begin(
folly::Optional<util::Throttler::Config> throttlerConfig) {
return Iterator(*this, throttlerConfig);
}
template <typename T,
typename ChainedHashTable::Hook<T> T::*HookPtr,
typename LockT>
void ChainedHashTable::Container<T, HookPtr, LockT>::Iterator::reset() {
curSor_ = 0;
currBucket_ = 0;
container_->getBucketElems(currBucket_, bucketElems_);
while (bucketElems_.empty() &&
++currBucket_ < container_->config_.getNumBuckets()) {
if (throttler_) {
throttler_->throttle();
}
container_->getBucketElems(currBucket_, bucketElems_);
}
XDCHECK_EQ(0u, curSor_);
}
} // namespace cachelib
} // namespace facebook