cachelib/navy/bighash/BigHash.cpp (383 lines of code) (raw):
/*
* Copyright (c) Facebook, Inc. and its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "cachelib/navy/bighash/BigHash.h"
#include <folly/Format.h>
#include <chrono>
#include <mutex>
#include <shared_mutex>
#include "cachelib/navy/bighash/Bucket.h"
#include "cachelib/navy/common/Utils.h"
#include "cachelib/navy/serialization/Serialization.h"
namespace facebook {
namespace cachelib {
namespace navy {
namespace {
constexpr uint64_t kMinSizeDistribution = 64;
constexpr double kSizeDistributionGranularityFactor = 1.25;
} // namespace
constexpr uint32_t BigHash::kFormatVersion;
BigHash::Config& BigHash::Config::validate() {
if (cacheSize < bucketSize) {
throw std::invalid_argument(
folly::sformat("cache size: {} cannot be smaller than bucket size: {}",
cacheSize,
bucketSize));
}
if (!folly::isPowTwo(bucketSize)) {
throw std::invalid_argument(
folly::sformat("invalid bucket size: {}", bucketSize));
}
if (cacheSize > uint64_t{bucketSize} << 32) {
throw std::invalid_argument(folly::sformat(
"Can't address big hash with 32 bits. Cache size: {}, bucket size: {}",
cacheSize,
bucketSize));
}
if (cacheBaseOffset % bucketSize != 0 || cacheSize % bucketSize != 0) {
throw std::invalid_argument(folly::sformat(
"cacheBaseOffset and cacheSize need to be a multiple of bucketSize. "
"cacheBaseOffset: {}, cacheSize:{}, bucketSize: {}.",
cacheBaseOffset,
cacheSize,
bucketSize));
}
if (device == nullptr) {
throw std::invalid_argument("device cannot be null");
}
if (bloomFilter && bloomFilter->numFilters() != numBuckets()) {
throw std::invalid_argument(
folly::sformat("bloom filter #filters mismatch #buckets: {} vs {}",
bloomFilter->numFilters(),
numBuckets()));
}
return *this;
}
BigHash::BigHash(Config&& config)
: BigHash{std::move(config.validate()), ValidConfigTag{}} {}
BigHash::BigHash(Config&& config, ValidConfigTag)
: destructorCb_{[this, cb = std::move(config.destructorCb)](
BufferView key,
BufferView value,
DestructorEvent event) {
sizeDist_.removeSize(key.size() + value.size());
if (cb) {
cb(key, value, event);
}
}},
bucketSize_{config.bucketSize},
cacheBaseOffset_{config.cacheBaseOffset},
numBuckets_{config.numBuckets()},
bloomFilter_{std::move(config.bloomFilter)},
device_{*config.device},
sizeDist_{kMinSizeDistribution, bucketSize_,
kSizeDistributionGranularityFactor} {
XLOGF(INFO,
"BigHash created: buckets: {}, bucket size: {}, base offset: {}",
numBuckets_,
bucketSize_,
cacheBaseOffset_);
reset();
}
void BigHash::reset() {
XLOG(INFO, "Reset BigHash");
generationTime_ = getSteadyClock();
if (bloomFilter_) {
bloomFilter_->reset();
}
itemCount_.set(0);
insertCount_.set(0);
succInsertCount_.set(0);
lookupCount_.set(0);
succLookupCount_.set(0);
removeCount_.set(0);
succRemoveCount_.set(0);
evictionCount_.set(0);
logicalWrittenCount_.set(0);
physicalWrittenCount_.set(0);
ioErrorCount_.set(0);
bfFalsePositiveCount_.set(0);
bfProbeCount_.set(0);
checksumErrorCount_.set(0);
sizeDist_.reset();
usedSizeBytes_.set(0);
}
double BigHash::bfFalsePositivePct() const {
const auto probes = bfProbeCount_.get();
if (bloomFilter_ && probes > 0) {
return 100.0 * bfFalsePositiveCount_.get() / probes;
} else {
return 0;
}
}
uint64_t BigHash::getMaxItemSize() const {
auto itemOverhead = BucketStorage::slotSize(sizeof(details::BucketEntry));
return bucketSize_ - sizeof(Bucket) - itemOverhead;
}
void BigHash::getCounters(const CounterVisitor& visitor) const {
visitor("navy_bh_items", itemCount_.get());
visitor("navy_bh_inserts", insertCount_.get());
visitor("navy_bh_succ_inserts", succInsertCount_.get());
visitor("navy_bh_lookups", lookupCount_.get());
visitor("navy_bh_succ_lookups", succLookupCount_.get());
visitor("navy_bh_removes", removeCount_.get());
visitor("navy_bh_succ_removes", succRemoveCount_.get());
visitor("navy_bh_evictions", evictionCount_.get());
visitor("navy_bh_logical_written", logicalWrittenCount_.get());
visitor("navy_bh_physical_written", physicalWrittenCount_.get());
visitor("navy_bh_io_errors", ioErrorCount_.get());
visitor("navy_bh_bf_false_positive_pct", bfFalsePositivePct());
visitor("navy_bh_bf_lookups", bfProbeCount_.get());
visitor("navy_bh_bf_rebuilds", bfRebuildCount_.get());
visitor("navy_bh_checksum_errors", checksumErrorCount_.get());
visitor("navy_bh_used_size_bytes", usedSizeBytes_.get());
auto snapshot = sizeDist_.getSnapshot();
for (auto& kv : snapshot) {
auto statName = folly::sformat("navy_bh_approx_bytes_in_size_{}", kv.first);
visitor(statName.c_str(), kv.second);
}
}
void BigHash::persist(RecordWriter& rw) {
XLOG(INFO, "Starting bighash persist");
serialization::BigHashPersistentData pd;
*pd.version_ref() = kFormatVersion;
*pd.generationTime_ref() = generationTime_.count();
*pd.itemCount_ref() = itemCount_.get();
*pd.bucketSize_ref() = bucketSize_;
*pd.cacheBaseOffset_ref() = cacheBaseOffset_;
*pd.numBuckets_ref() = numBuckets_;
*pd.sizeDist_ref() = sizeDist_.getSnapshot();
serializeProto(pd, rw);
if (bloomFilter_) {
bloomFilter_->persist<ProtoSerializer>(rw);
XLOG(INFO, "bloom filter persist done");
}
XLOG(INFO, "Finished bighash persist");
}
bool BigHash::recover(RecordReader& rr) {
XLOG(INFO, "Starting bighash recovery");
try {
auto pd = deserializeProto<serialization::BigHashPersistentData>(rr);
if (*pd.version_ref() != kFormatVersion) {
throw std::logic_error{
folly::sformat("invalid format version {}, expected {}",
*pd.version_ref(),
kFormatVersion)};
}
auto configEquals =
static_cast<uint64_t>(*pd.bucketSize_ref()) == bucketSize_ &&
static_cast<uint64_t>(*pd.cacheBaseOffset_ref()) == cacheBaseOffset_ &&
static_cast<uint64_t>(*pd.numBuckets_ref()) == numBuckets_;
if (!configEquals) {
auto configStr = serializeToJson(pd);
XLOGF(ERR, "Recovery config: {}", configStr.c_str());
throw std::logic_error{"config mismatch"};
}
generationTime_ = std::chrono::nanoseconds{*pd.generationTime_ref()};
itemCount_.set(*pd.itemCount_ref());
sizeDist_ = SizeDistribution{*pd.sizeDist_ref()};
if (bloomFilter_) {
bloomFilter_->recover<ProtoSerializer>(rr);
XLOG(INFO, "Recovered bloom filter");
}
} catch (const std::exception& e) {
XLOGF(ERR, "Exception: {}", e.what());
XLOG(ERR, "Failed to recover bighash. Resetting cache.");
reset();
return false;
}
XLOG(INFO, "Finished bighash recovery");
return true;
}
Status BigHash::insert(HashedKey hk, BufferView value) {
const auto bid = getBucketId(hk);
insertCount_.inc();
uint32_t removed{0};
uint32_t evicted{0};
uint32_t oldRemainingBytes = 0;
uint32_t newRemainingBytes = 0;
// we copy the items and trigger the destructorCb after bucket lock is
// released to avoid possible heavy operations or locks in the destrcutor.
std::vector<std::tuple<Buffer, Buffer, DestructorEvent>> removedItems;
DestructorCallback cb =
[&removedItems](BufferView key, BufferView val, DestructorEvent event) {
removedItems.emplace_back(key, val, event);
};
{
std::unique_lock<folly::SharedMutex> lock{getMutex(bid)};
auto buffer = readBucket(bid);
if (buffer.isNull()) {
ioErrorCount_.inc();
return Status::DeviceError;
}
auto* bucket = reinterpret_cast<Bucket*>(buffer.data());
oldRemainingBytes = bucket->remainingBytes();
removed = bucket->remove(hk, cb);
evicted = bucket->insert(hk, value, cb);
newRemainingBytes = bucket->remainingBytes();
// rebuild / fix the bloom filter before we move the buffer to do the
// actual write
if (bloomFilter_) {
if (removed + evicted == 0) {
// In case nothing was removed or evicted, we can just add
bloomFilter_->set(bid.index(), hk.keyHash());
} else {
bfRebuild(bid, bucket);
}
}
const auto res = writeBucket(bid, std::move(buffer));
if (!res) {
if (bloomFilter_) {
bloomFilter_->clear(bid.index());
}
ioErrorCount_.inc();
return Status::DeviceError;
}
}
for (const auto& item : removedItems) {
destructorCb_(std::get<0>(item).view() /* key */,
std::get<1>(item).view() /* value */,
std::get<2>(item) /* event */);
}
if (oldRemainingBytes < newRemainingBytes) {
usedSizeBytes_.sub(newRemainingBytes - oldRemainingBytes);
} else {
usedSizeBytes_.add(oldRemainingBytes - newRemainingBytes);
}
sizeDist_.addSize(hk.key().size() + value.size());
itemCount_.add(1);
itemCount_.sub(evicted + removed);
evictionCount_.add(evicted);
logicalWrittenCount_.add(hk.key().size() + value.size());
physicalWrittenCount_.add(bucketSize_);
succInsertCount_.inc();
return Status::Ok;
}
bool BigHash::couldExist(HashedKey hk) {
const auto bid = getBucketId(hk);
bool canExist;
{
std::shared_lock<folly::SharedMutex> lock{getMutex(bid)};
canExist = !bfReject(bid, hk.keyHash());
}
// the caller is not likely to issue a subsequent lookup when we return
// false. hence tag this as a lookup. If we return the key can exist, the
// caller will perform a lookupAsync and will be counted within lookup api.
if (!canExist) {
lookupCount_.inc();
}
return canExist;
}
Status BigHash::lookup(HashedKey hk, Buffer& value) {
const auto bid = getBucketId(hk);
lookupCount_.inc();
Bucket* bucket{nullptr};
Buffer buffer;
// scope of the lock is only needed until we read and mutate state for the
// bucket. Once the bucket is read, the buffer is local and we can find
// without holding the lock.
{
std::shared_lock<folly::SharedMutex> lock{getMutex(bid)};
if (bfReject(bid, hk.keyHash())) {
return Status::NotFound;
}
buffer = readBucket(bid);
if (buffer.isNull()) {
ioErrorCount_.inc();
return Status::DeviceError;
}
bucket = reinterpret_cast<Bucket*>(buffer.data());
}
auto valueView = bucket->find(hk);
if (valueView.isNull()) {
bfFalsePositiveCount_.inc();
return Status::NotFound;
}
value = Buffer{valueView};
succLookupCount_.inc();
return Status::Ok;
}
Status BigHash::remove(HashedKey hk) {
const auto bid = getBucketId(hk);
removeCount_.inc();
uint32_t oldRemainingBytes = 0;
uint32_t newRemainingBytes = 0;
// we copy the items and trigger the destructorCb after bucket lock is
// released to avoid possible heavy operations or locks in the destrcutor.
Buffer valueCopy;
DestructorCallback cb = [&valueCopy](
BufferView, BufferView value, DestructorEvent) {
valueCopy = Buffer{value};
};
{
std::unique_lock<folly::SharedMutex> lock{getMutex(bid)};
if (bfReject(bid, hk.keyHash())) {
return Status::NotFound;
}
auto buffer = readBucket(bid);
if (buffer.isNull()) {
ioErrorCount_.inc();
return Status::DeviceError;
}
auto* bucket = reinterpret_cast<Bucket*>(buffer.data());
oldRemainingBytes = bucket->remainingBytes();
if (!bucket->remove(hk, cb)) {
bfFalsePositiveCount_.inc();
return Status::NotFound;
}
newRemainingBytes = bucket->remainingBytes();
// We compute bloom filter before writing the bucket because when encryption
// is enabled, we will "move" the bucket content into writeBucket().
if (bloomFilter_) {
bfRebuild(bid, bucket);
}
const auto res = writeBucket(bid, std::move(buffer));
if (!res) {
if (bloomFilter_) {
bloomFilter_->clear(bid.index());
}
ioErrorCount_.inc();
return Status::DeviceError;
}
}
if (!valueCopy.isNull()) {
destructorCb_(hk.key(), valueCopy.view(), DestructorEvent::Removed);
}
XDCHECK_LE(oldRemainingBytes, newRemainingBytes);
usedSizeBytes_.sub(newRemainingBytes - oldRemainingBytes);
itemCount_.dec();
// We do not bump logicalWrittenCount_ because logically a
// remove operation does not write, but for BigHash, it does
// incur physical writes.
physicalWrittenCount_.add(bucketSize_);
succRemoveCount_.inc();
return Status::Ok;
}
bool BigHash::bfReject(BucketId bid, uint64_t keyHash) const {
if (bloomFilter_) {
bfProbeCount_.inc();
if (!bloomFilter_->couldExist(bid.index(), keyHash)) {
bfRejectCount_.inc();
return true;
}
}
return false;
}
void BigHash::bfRebuild(BucketId bid, const Bucket* bucket) {
bfRebuildCount_.inc();
XDCHECK(bloomFilter_);
bloomFilter_->clear(bid.index());
auto itr = bucket->getFirst();
while (!itr.done()) {
bloomFilter_->set(bid.index(), itr.keyHash());
itr = bucket->getNext(itr);
}
}
void BigHash::flush() {
XLOG(INFO, "Flush big hash");
device_.flush();
}
Buffer BigHash::readBucket(BucketId bid) {
auto buffer = device_.makeIOBuffer(bucketSize_);
XDCHECK(!buffer.isNull());
const bool res =
device_.read(getBucketOffset(bid), buffer.size(), buffer.data());
if (!res) {
return {};
}
auto* bucket = reinterpret_cast<Bucket*>(buffer.data());
const auto checksumSuccess =
Bucket::computeChecksum(buffer.view()) == bucket->getChecksum();
// TODO (T93631284) we only read a bucket if the bloom filter indicates that
// the bucket could have the element. Hence, if check sum errors out and bloom
// filter is enable, we could record the checksum error. However, doing so
// could lead to false positives on check sum errors for buckets that were not
// initialized (by writing to it), but were read due to bloom filter having a
// false positive. Hence, we can't differentiate between false positives and
// real check sum errors due device failures
//
// if (!checksumSuccess && bloomFilter_) {
// checksumErrorCount_.inc();
// }
if (!checksumSuccess || static_cast<uint64_t>(generationTime_.count()) !=
bucket->generationTime()) {
Bucket::initNew(buffer.mutableView(), generationTime_.count());
}
return buffer;
}
bool BigHash::writeBucket(BucketId bid, Buffer buffer) {
auto* bucket = reinterpret_cast<Bucket*>(buffer.data());
bucket->setChecksum(Bucket::computeChecksum(buffer.view()));
return device_.write(getBucketOffset(bid), std::move(buffer));
}
} // namespace navy
} // namespace cachelib
} // namespace facebook