cachelib/navy/driver/Driver.cpp (354 lines of code) (raw):
/*
* Copyright (c) Facebook, Inc. and its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "cachelib/navy/driver/Driver.h"
#include <folly/synchronization/Baton.h>
#include "cachelib/navy/admission_policy/DynamicRandomAP.h"
#include "cachelib/navy/common/Hash.h"
#include "cachelib/navy/driver/NoopEngine.h"
#include "folly/Format.h"
namespace facebook {
namespace cachelib {
namespace navy {
Driver::Config& Driver::Config::validate() {
if (smallItemCache != nullptr && smallItemMaxSize == 0) {
throw std::invalid_argument("invalid small item cache params");
}
if (smallItemCache != nullptr) {
if (smallItemMaxSize > smallItemCache->getMaxItemSize()) {
throw std::invalid_argument(folly::sformat(
"small item max size should not excceed: {}, but is set to be: {}",
smallItemCache->getMaxItemSize(), smallItemMaxSize));
}
}
return *this;
}
Driver::Driver(Config&& config)
: Driver{std::move(config.validate()), ValidConfigTag{}} {}
Driver::Driver(Config&& config, ValidConfigTag)
: smallItemMaxSize_{config.smallItemCache ? config.smallItemMaxSize : 0},
maxConcurrentInserts_{config.maxConcurrentInserts},
maxParcelMemory_{config.maxParcelMemory},
metadataSize_{config.metadataSize},
device_{std::move(config.device)},
scheduler_{std::move(config.scheduler)},
largeItemCache_{std::move(config.largeItemCache)},
smallItemCache_{std::move(config.smallItemCache)},
admissionPolicy_{std::move(config.admissionPolicy)} {
if (!largeItemCache_) {
XLOG(INFO, "Large item cache is noop");
largeItemCache_ = std::make_unique<NoopEngine>();
}
if (!smallItemCache_) {
XLOG(INFO, "Small item cache is noop");
smallItemCache_ = std::make_unique<NoopEngine>();
}
XLOGF(INFO, "Max concurrent inserts: {}", maxConcurrentInserts_);
XLOGF(INFO, "Max parcel memory: {}", maxParcelMemory_);
}
Driver::~Driver() {
XLOG(INFO, "Driver: finish scheduler");
scheduler_->finish();
XLOG(INFO, "Driver: finish scheduler successful");
// Destroy this for safety first
scheduler_.reset();
}
std::pair<Engine&, Engine&> Driver::select(BufferView key,
BufferView value) const {
if (isItemLarge(key, value)) {
return {*largeItemCache_, *smallItemCache_};
} else {
return {*smallItemCache_, *largeItemCache_};
}
}
bool Driver::isItemLarge(BufferView key, BufferView value) const {
return key.size() + value.size() > smallItemMaxSize_;
}
bool Driver::couldExist(BufferView key) {
const HashedKey hk{key};
auto couldExist =
smallItemCache_->couldExist(hk) || largeItemCache_->couldExist(hk);
if (!couldExist) {
lookupCount_.inc();
}
return couldExist;
}
Status Driver::insert(BufferView key, BufferView value) {
folly::Baton<> done;
Status cbStatus{Status::Ok};
auto status = insertAsync(key, value,
[&done, &cbStatus](Status s, BufferView /* key */) {
cbStatus = s;
done.post();
});
if (status != Status::Ok) {
return status;
}
done.wait();
return cbStatus;
}
bool Driver::admissionTest(HashedKey hk, BufferView value) const {
// If this parcel makes our memory above the limit, we reject it and
// revert back increment we made. We can't split check and increment!
// We can't check value before - it will over admit things. Same with
// concurrent inserts.
size_t parcelSize = hk.key().size() + value.size();
auto currParcelMemory = parcelMemory_.add_fetch(parcelSize);
auto currConcurrentInserts = concurrentInserts_.add_fetch(1);
if (!admissionPolicy_ || admissionPolicy_->accept(hk, value)) {
if (currConcurrentInserts <= maxConcurrentInserts_) {
if (currParcelMemory <= maxParcelMemory_) {
acceptedCount_.inc();
acceptedBytes_.add(parcelSize);
return true;
} else {
rejectedParcelMemoryCount_.inc();
}
} else {
rejectedConcurrentInsertsCount_.inc();
}
}
rejectedCount_.inc();
rejectedBytes_.add(parcelSize);
// Revert counter modifications. Remember, can't assign back atomic.
concurrentInserts_.dec();
parcelMemory_.sub(parcelSize);
return false;
}
Status Driver::insertAsync(BufferView key,
BufferView value,
InsertCallback cb) {
insertCount_.inc();
const HashedKey hk{key};
if (key.size() > kMaxKeySize) {
rejectedCount_.inc();
rejectedBytes_.add(hk.key().size() + value.size());
return Status::Rejected;
}
if (!admissionTest(hk, value)) {
return Status::Rejected;
}
scheduler_->enqueueWithKey(
[this, cb = std::move(cb), hk, value, skipInsertion = false]() mutable {
auto selection = select(hk.key(), value);
Status status = Status::Ok;
if (!skipInsertion) {
status = selection.first.insert(hk, value);
if (status == Status::Retry) {
return JobExitCode::Reschedule;
}
skipInsertion = true;
}
if (status != Status::DeviceError) {
auto rs = selection.second.remove(hk);
if (status == Status::Retry) {
return JobExitCode::Reschedule;
}
if (rs != Status::Ok && rs != Status::NotFound) {
XLOGF(ERR, "Insert failed to remove other: {}", toString(rs));
status = Status::BadState;
}
}
if (cb) {
cb(status, hk.key());
}
parcelMemory_.sub(hk.key().size() + value.size());
concurrentInserts_.dec();
switch (status) {
case Status::Ok:
succInsertCount_.inc();
break;
case Status::BadState:
case Status::DeviceError:
ioErrorCount_.inc();
break;
default:;
}
return JobExitCode::Done;
},
"insert",
JobType::Write,
hk.keyHash());
return Status::Ok;
}
void Driver::updateLookupStats(Status status) const {
switch (status) {
case Status::Ok:
succLookupCount_.inc();
break;
case Status::DeviceError:
ioErrorCount_.inc();
break;
default:;
}
}
Status Driver::lookup(BufferView key, Buffer& value) {
// We do busy wait because we don't expect many retries.
lookupCount_.inc();
const HashedKey hk{key};
Status status{Status::NotFound};
while ((status = largeItemCache_->lookup(hk, value)) == Status::Retry) {
std::this_thread::yield();
}
if (status == Status::NotFound) {
while ((status = smallItemCache_->lookup(hk, value)) == Status::Retry) {
std::this_thread::yield();
}
}
updateLookupStats(status);
return status;
}
Status Driver::lookupAsync(BufferView key, LookupCallback cb) {
lookupCount_.inc();
const HashedKey hk{key};
XDCHECK(cb);
scheduler_->enqueueWithKey(
[this, cb = std::move(cb), hk, skipLargeItemCache = false]() mutable {
Buffer value;
Status status{Status::NotFound};
if (!skipLargeItemCache) {
status = largeItemCache_->lookup(hk, value);
if (status == Status::Retry) {
return JobExitCode::Reschedule;
}
skipLargeItemCache = true;
}
if (status == Status::NotFound) {
status = smallItemCache_->lookup(hk, value);
if (status == Status::Retry) {
return JobExitCode::Reschedule;
}
}
if (cb) {
cb(status, hk.key(), std::move(value));
}
updateLookupStats(status);
return JobExitCode::Done;
},
"lookup",
JobType::Read,
hk.keyHash());
return Status::Ok;
}
Status Driver::removeHashedKey(HashedKey hk, bool& skipSmallItemCache) {
removeCount_.inc();
Status status = Status::NotFound;
if (!skipSmallItemCache) {
status = smallItemCache_->remove(hk);
}
if (status == Status::NotFound) {
status = largeItemCache_->remove(hk);
skipSmallItemCache = true;
}
switch (status) {
case Status::Ok:
succRemoveCount_.inc();
break;
case Status::DeviceError:
ioErrorCount_.inc();
break;
default:;
}
return status;
}
Status Driver::remove(BufferView key) {
const HashedKey hk{key};
Status status{Status::Ok};
bool skipSmallItemCache = false;
while ((status = removeHashedKey(hk, skipSmallItemCache)) == Status::Retry) {
std::this_thread::yield();
}
return status;
}
Status Driver::removeAsync(BufferView key, RemoveCallback cb) {
const HashedKey hk{key};
scheduler_->enqueueWithKey(
[this, cb = std::move(cb), hk = hk,
skipSmallItemCache = false]() mutable {
auto status = removeHashedKey(hk, skipSmallItemCache);
if (status == Status::Retry) {
return JobExitCode::Reschedule;
}
if (cb) {
cb(status, hk.key());
}
return JobExitCode::Done;
},
"remove",
JobType::Write,
hk.keyHash());
return Status::Ok;
}
void Driver::flush() {
scheduler_->finish();
smallItemCache_->flush();
largeItemCache_->flush();
}
void Driver::reset() {
XLOG(INFO, "Reset Navy");
scheduler_->finish();
smallItemCache_->reset();
largeItemCache_->reset();
if (admissionPolicy_) {
admissionPolicy_->reset();
}
}
void Driver::persist() const {
auto rw = createMetadataRecordWriter(*device_, metadataSize_);
if (rw) {
largeItemCache_->persist(*rw);
smallItemCache_->persist(*rw);
}
}
bool Driver::recover() {
auto rr = createMetadataRecordReader(*device_, metadataSize_);
if (!rr) {
return false;
}
if (rr->isEnd()) {
return false;
}
// Because we insert item and remove from the other engine, partial recovery
// is potentially possible.
bool recovered =
largeItemCache_->recover(*rr) && smallItemCache_->recover(*rr);
if (!recovered) {
reset();
}
if (recovered) {
// If recovery is successful, invalidate the metadata
auto rw = createMetadataRecordWriter(*device_, metadataSize_);
if (rw) {
return rw->invalidate();
} else {
recovered = false;
}
}
return recovered;
}
bool Driver::updateMaxRateForDynamicRandomAP(uint64_t maxRate) {
DynamicRandomAP* ptr = dynamic_cast<DynamicRandomAP*>(admissionPolicy_.get());
if (ptr) {
ptr->setMaxWriteRate(maxRate);
return true;
}
return false;
}
uint64_t Driver::getSize() const { return device_->getSize(); }
void Driver::getCounters(const CounterVisitor& visitor) const {
visitor("navy_inserts", insertCount_.get());
visitor("navy_succ_inserts", succInsertCount_.get());
visitor("navy_lookups", lookupCount_.get());
visitor("navy_succ_lookups", succLookupCount_.get());
visitor("navy_removes", removeCount_.get());
visitor("navy_succ_removes", succRemoveCount_.get());
visitor("navy_rejected", rejectedCount_.get());
visitor("navy_rejected_concurrent_inserts",
rejectedConcurrentInsertsCount_.get());
visitor("navy_rejected_parcel_memory", rejectedParcelMemoryCount_.get());
visitor("navy_rejected_bytes", rejectedBytes_.get());
visitor("navy_accepted_bytes", acceptedBytes_.get());
visitor("navy_accepted", acceptedCount_.get());
visitor("navy_io_errors", ioErrorCount_.get());
visitor("navy_parcel_memory", parcelMemory_.get());
visitor("navy_concurrent_inserts", concurrentInserts_.get());
scheduler_->getCounters(visitor);
largeItemCache_->getCounters(visitor);
smallItemCache_->getCounters(visitor);
if (admissionPolicy_) {
admissionPolicy_->getCounters(visitor);
}
// Can be nullptr in driver tests
if (device_) {
device_->getCounters(visitor);
}
}
} // namespace navy
} // namespace cachelib
} // namespace facebook