cachelib/navy/Factory.cpp (314 lines of code) (raw):
/*
* Copyright (c) Facebook, Inc. and its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "cachelib/navy/Factory.h"
#include <folly/Format.h>
#include <folly/Random.h>
#include <stdexcept>
#include "cachelib/navy/admission_policy/DynamicRandomAP.h"
#include "cachelib/navy/admission_policy/RejectRandomAP.h"
#include "cachelib/navy/bighash/BigHash.h"
#include "cachelib/navy/block_cache/BlockCache.h"
#include "cachelib/navy/block_cache/FifoPolicy.h"
#include "cachelib/navy/block_cache/LruPolicy.h"
#include "cachelib/navy/driver/Driver.h"
#include "cachelib/navy/serialization/RecordIO.h"
/* O_DIRECT not available on Mac OS */
#ifndef O_DIRECT
#define O_DIRECT 0
#endif
namespace facebook {
namespace cachelib {
namespace navy {
namespace {
class BlockCacheProtoImpl final : public BlockCacheProto {
public:
BlockCacheProtoImpl() = default;
~BlockCacheProtoImpl() override = default;
void setLayout(uint64_t baseOffset,
uint64_t size,
uint32_t regionSize) override {
if (size <= 0 || regionSize <= 0) {
throw std::invalid_argument(folly::sformat(
"Invalid layout. size: {}, regionSize: {}.", size, regionSize));
}
config_.cacheBaseOffset = baseOffset;
config_.cacheSize = size;
config_.regionSize = regionSize;
}
void setChecksum(bool enable) override { config_.checksum = enable; }
void setLruEvictionPolicy() override {
if (!(config_.cacheSize > 0 && config_.regionSize > 0)) {
throw std::logic_error("layout is not set");
}
auto numRegions = config_.getNumRegions();
if (config_.evictionPolicy) {
throw std::invalid_argument("There's already an eviction policy set");
}
config_.evictionPolicy = std::make_unique<LruPolicy>(numRegions);
}
void setFifoEvictionPolicy() override {
if (config_.evictionPolicy) {
throw std::invalid_argument("There's already an eviction policy set");
}
config_.evictionPolicy = std::make_unique<FifoPolicy>();
}
void setSegmentedFifoEvictionPolicy(
std::vector<unsigned int> segmentRatio) override {
if (config_.evictionPolicy) {
throw std::invalid_argument("There's already an eviction policy set");
}
config_.numPriorities = static_cast<uint16_t>(segmentRatio.size());
config_.evictionPolicy =
std::make_unique<SegmentedFifoPolicy>(std::move(segmentRatio));
}
void setReadBufferSize(uint32_t size) override {
config_.readBufferSize = size;
}
void setCleanRegionsPool(uint32_t n) override {
config_.cleanRegionsPool = n;
}
void setReinsertionConfig(
const BlockCacheReinsertionConfig& reinsertionConfig) override {
config_.reinsertionConfig = reinsertionConfig;
}
void setDevice(Device* device) { config_.device = device; }
void setNumInMemBuffers(uint32_t numInMemBuffers) override {
config_.numInMemBuffers = numInMemBuffers;
}
void setItemDestructorEnabled(bool itemDestructorEnabled) override {
config_.itemDestructorEnabled = itemDestructorEnabled;
}
void setPreciseRemove(bool preciseRemove) override {
config_.preciseRemove = preciseRemove;
}
std::unique_ptr<Engine> create(JobScheduler& scheduler,
DestructorCallback cb) && {
config_.scheduler = &scheduler;
config_.destructorCb = std::move(cb);
config_.validate();
return std::make_unique<BlockCache>(std::move(config_));
}
private:
BlockCache::Config config_;
};
class BigHashProtoImpl final : public BigHashProto {
public:
BigHashProtoImpl() = default;
~BigHashProtoImpl() override = default;
void setLayout(uint64_t baseOffset,
uint64_t size,
uint32_t bucketSize) override {
config_.cacheBaseOffset = baseOffset;
config_.cacheSize = size;
config_.bucketSize = bucketSize;
}
// BigHash uses bloom filters (BF) to reduce number of IO. We want to maintain
// BF for every bucket, because we want to rebuild it on every remove to keep
// its filtering properties.
void setBloomFilter(uint32_t numHashes, uint32_t hashTableBitSize) override {
// Want to make @setLayout and Bloom filter setup independent.
bloomFilterEnabled_ = true;
numHashes_ = numHashes;
hashTableBitSize_ = hashTableBitSize;
}
void setDevice(Device* device) { config_.device = device; }
void setDestructorCb(DestructorCallback cb) {
config_.destructorCb = std::move(cb);
}
std::unique_ptr<Engine> create() && {
if (bloomFilterEnabled_) {
if (config_.bucketSize == 0) {
throw std::invalid_argument{"invalid bucket size"};
}
config_.bloomFilter = std::make_unique<BloomFilter>(
config_.numBuckets(), numHashes_, hashTableBitSize_);
}
return std::make_unique<BigHash>(std::move(config_));
}
private:
BigHash::Config config_;
bool bloomFilterEnabled_{false};
uint32_t numHashes_{};
uint32_t hashTableBitSize_{};
};
class CacheProtoImpl final : public CacheProto {
public:
CacheProtoImpl() = default;
~CacheProtoImpl() override = default;
void setMaxConcurrentInserts(uint32_t limit) override {
config_.maxConcurrentInserts = limit;
}
void setMaxParcelMemory(uint64_t limit) override {
config_.maxParcelMemory = limit;
}
void setDevice(std::unique_ptr<Device> device) override {
config_.device = std::move(device);
}
void setMetadataSize(size_t size) override { config_.metadataSize = size; }
void setBlockCache(std::unique_ptr<BlockCacheProto> proto) override {
blockCacheProto_ = std::move(proto);
}
void setBigHash(std::unique_ptr<BigHashProto> proto,
uint32_t smallItemMaxSize) override {
bigHashProto_ = std::move(proto);
config_.smallItemMaxSize = smallItemMaxSize;
}
void setDestructorCallback(DestructorCallback cb) override {
destructorCb_ = std::move(cb);
}
void setRejectRandomAdmissionPolicy(const RandomAPConfig& config) override {
RejectRandomAP::Config apConfig;
apConfig.probability = config.getAdmProbability();
apConfig.seed = folly::Random::rand32();
config_.admissionPolicy =
std::make_unique<RejectRandomAP>(std::move(apConfig));
}
void setDynamicRandomAdmissionPolicy(
const DynamicRandomAPConfig& config) override {
DynamicRandomAP::Config apConfig;
apConfig.targetRate = config.getAdmWriteRate();
apConfig.fnBytesWritten = [device = config_.device.get()]() {
return device->getBytesWritten();
};
apConfig.seed = folly::Random::rand32();
apConfig.deterministicKeyHashSuffixLength = config.getAdmSuffixLength();
uint32_t itemBaseSize = config.getAdmProbBaseSize();
if (itemBaseSize > 0) {
apConfig.baseSize = itemBaseSize;
}
uint64_t maxRate = config.getMaxWriteRate();
if (maxRate > 0) {
apConfig.maxRate = maxRate;
}
double probFactorLowerBound = config.getProbFactorLowerBound();
double probFactorUpperBound = config.getProbFactorUpperBound();
if (probFactorLowerBound > 0 && probFactorUpperBound > 0) {
apConfig.probFactorLowerBound = probFactorLowerBound;
apConfig.probFactorUpperBound = probFactorUpperBound;
}
config_.admissionPolicy =
std::make_unique<DynamicRandomAP>(std::move(apConfig));
}
void setJobScheduler(std::unique_ptr<JobScheduler> ex) override {
config_.scheduler = std::move(ex);
}
std::unique_ptr<AbstractCache> create() && {
if (config_.scheduler == nullptr) {
throw std::invalid_argument("scheduler is not set");
}
if (blockCacheProto_) {
auto bcProto = dynamic_cast<BlockCacheProtoImpl*>(blockCacheProto_.get());
if (bcProto != nullptr) {
bcProto->setDevice(config_.device.get());
config_.largeItemCache =
std::move(*bcProto).create(*config_.scheduler, destructorCb_);
}
}
if (bigHashProto_) {
auto bhProto = dynamic_cast<BigHashProtoImpl*>(bigHashProto_.get());
if (bhProto != nullptr) {
bhProto->setDevice(config_.device.get());
bhProto->setDestructorCb(destructorCb_);
config_.smallItemCache = std::move(*bhProto).create();
}
}
return std::make_unique<Driver>(std::move(config_));
}
private:
DestructorCallback destructorCb_;
std::unique_ptr<BlockCacheProto> blockCacheProto_;
std::unique_ptr<BigHashProto> bigHashProto_;
Driver::Config config_;
};
// Open cache file @fileName and set it size to @size.
// Throws std::system_error if failed.
folly::File openCacheFile(const std::string& fileName,
uint64_t size,
bool truncate) {
XLOG(INFO) << "Cache file: " << fileName << " size: " << size
<< " truncate: " << truncate;
if (fileName.empty()) {
throw std::invalid_argument("File name is empty");
}
int flags{O_RDWR | O_CREAT};
// try opening with o_direct. For tests, we might get a file on tmpfs that
// might not support o_direct. Hence, we might have to default to avoiding
// o_direct in those cases.
folly::File f;
try {
f = folly::File(fileName.c_str(), flags | O_DIRECT);
} catch (const std::system_error& e) {
if (e.code().value() == EINVAL) {
XLOG(ERR) << "Failed to open with o-direct, trying without. Error: "
<< e.what();
f = folly::File(fileName.c_str(), flags);
} else {
throw;
}
}
XDCHECK_GE(f.fd(), 0);
#ifndef MISSING_FALLOCATE
// TODO: T95780876 detect if file exists and is of expected size. If not,
// automatically fallocate the file or ftruncate the file.
if (truncate && ::fallocate(f.fd(), 0, 0, size) < 0) {
throw std::system_error(
errno,
std::system_category(),
folly::sformat("failed fallocate with size {}", size));
}
#endif
#ifndef MISSING_FADVISE
if (::posix_fadvise(f.fd(), 0, size, POSIX_FADV_DONTNEED) < 0) {
throw std::system_error(errno, std::system_category(),
"Error fadvising cache file");
}
#endif
return f;
}
} // namespace
std::unique_ptr<BlockCacheProto> createBlockCacheProto() {
return std::make_unique<BlockCacheProtoImpl>();
}
std::unique_ptr<BigHashProto> createBigHashProto() {
return std::make_unique<BigHashProtoImpl>();
}
std::unique_ptr<CacheProto> createCacheProto() {
return std::make_unique<CacheProtoImpl>();
}
std::unique_ptr<AbstractCache> createCache(std::unique_ptr<CacheProto> proto) {
return std::move(dynamic_cast<CacheProtoImpl&>(*proto)).create();
}
std::unique_ptr<Device> createRAIDDevice(
std::vector<std::string> raidPaths,
uint64_t fdSize,
bool truncateFile,
uint32_t blockSize,
uint32_t stripeSize,
std::shared_ptr<navy::DeviceEncryptor> encryptor,
uint32_t maxDeviceWriteSize) {
// File paths are opened in the increasing order of the
// path string. This ensures that RAID0 stripes aren't
// out of order even if the caller changes the order of
// the file paths. We can recover the cache as long as all
// the paths are specified, regardless of the order.
std::sort(raidPaths.begin(), raidPaths.end());
std::vector<folly::File> fileVec;
for (const auto& path : raidPaths) {
folly::File f;
try {
f = openCacheFile(path, fdSize, truncateFile);
} catch (const std::exception& e) {
XLOG(ERR) << "Exception in openCacheFile: " << path << e.what()
<< ". Errno: " << errno;
throw;
}
fileVec.push_back(std::move(f));
}
return createDirectIoRAID0Device(std::move(fileVec),
fdSize,
blockSize,
stripeSize,
std::move(encryptor),
maxDeviceWriteSize);
}
std::unique_ptr<Device> createFileDevice(
std::string fileName,
uint64_t singleFileSize,
bool truncateFile,
uint32_t blockSize,
std::shared_ptr<navy::DeviceEncryptor> encryptor,
uint32_t maxDeviceWriteSize) {
folly::File f;
try {
f = openCacheFile(fileName, singleFileSize, truncateFile);
} catch (const std::exception& e) {
XLOG(ERR) << "Exception in openCacheFile: " << e.what();
throw;
}
return createDirectIoFileDevice(std::move(f), singleFileSize, blockSize,
std::move(encryptor), maxDeviceWriteSize);
}
} // namespace navy
} // namespace cachelib
} // namespace facebook