cachelib/navy/admission_policy/DynamicRandomAP.cpp (179 lines of code) (raw):

/* * Copyright (c) Facebook, Inc. and its affiliates. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ #include "cachelib/navy/admission_policy/DynamicRandomAP.h" #include <folly/Format.h> #include <folly/hash/Hash.h> #include <folly/lang/Bits.h> #include <algorithm> #include <shared_mutex> #include <utility> #include "cachelib/navy/common/Utils.h" namespace facebook { namespace cachelib { namespace navy { namespace { // Clamp the input into range [lower, upper] double clamp(double input, double lower, double upper) { return std::min(std::max(input, lower), upper); } } // namespace DynamicRandomAP::Config& DynamicRandomAP::Config::validate() { if (targetRate == 0) { throw std::invalid_argument{folly::sformat( "Target rate must be greater than 0. Target rate: {}", targetRate)}; } if (updateInterval == std::chrono::seconds{0}) { throw std::invalid_argument{ folly::sformat("Update interval must be greater than 0. Interval: {}", updateInterval.count())}; } if (probabilitySeed <= 0) { throw std::invalid_argument{folly::sformat( "Probability seed must be greater than 0. Seed: {}", probabilitySeed)}; } if (baseSize == 0) { throw std::invalid_argument{folly::sformat( "Base size must be greater than 0. Base size: {}", baseSize)}; } if (!between(probabilitySizeDecay, 0, 1)) { throw std::invalid_argument{folly::sformat( "Probability size decay must be in range [0, 1]. Decay: {}", probabilitySizeDecay)}; } if (!betweenStrict(changeWindow, 0, 1)) { throw std::invalid_argument{folly::sformat( "Change window must be in range (0, 1). Change: {}", changeWindow)}; } if (!fnBytesWritten) { throw std::invalid_argument{"fnBytesWritten function required"}; } return *this; } DynamicRandomAP::DynamicRandomAP(Config&& config) : DynamicRandomAP{std::move(config.validate()), ValidConfigTag{}} {} DynamicRandomAP::DynamicRandomAP(Config&& config, ValidConfigTag) : targetRate_{config.targetRate}, maxRate_{config.maxRate}, updateInterval_{config.updateInterval}, baseProbabilityMultiplier_{folly::findLastSet(config.baseSize)}, probabilitySeed_{config.probabilitySeed}, baseProbabilityExponent_{config.probabilitySizeDecay}, maxChange_{1.0 + config.changeWindow}, minChange_{1.0 - config.changeWindow}, fnBytesWritten_{std::move(config.fnBytesWritten)}, lowerBound_{config.probFactorLowerBound}, upperBound_{config.probFactorUpperBound}, rg_{config.seed}, deterministicKeyHashSuffixLength_{ config.deterministicKeyHashSuffixLength} { reset(); XLOGF(INFO, "DynamicRandomAP: target rate {} byte/s, update interval {} s. " "maxChange {}, minChange {}, kUpperBound_ {}, kLowerBound_ {}.", targetRate_, updateInterval_.count(), maxChange_, minChange_, kUpperBound_, kLowerBound_); } bool DynamicRandomAP::accept(HashedKey hk, BufferView value) { const auto curTime = getSteadyClockSeconds(); auto params = getThrottleParams(); if (curTime - params.updateTime >= updateInterval_) { // Lots of threads can get into this section. First to grab the lock will // update. Let proceed the rest. std::unique_lock<folly::SharedMutex> lock{mutex_, std::try_to_lock}; if (lock.owns_lock()) { updateThrottleParams(curTime); } } auto baseProb = getBaseProbability(hk.key().size() + value.size()); baseProbStats_.trackValue(baseProb * 100); auto probability = baseProb * params.probabilityFactor; probability = std::max(0.0, std::min(1.0, probability)); return probability == 1 || genF(hk) < probability; } // generate a value between [0, 1) double DynamicRandomAP::genF(const HashedKey& hk) const { if (deterministicKeyHashSuffixLength_) { auto key = hk.key(); size_t len = key.size() > deterministicKeyHashSuffixLength_ ? key.size() - deterministicKeyHashSuffixLength_ : key.size(); uint64_t h = folly::hash::SpookyHashV2::Hash64(key.data(), len, 0 /* seed */); return fdiv(static_cast<double>(h), static_cast<double>(std::numeric_limits<decltype(h)>::max())); } return fdiv(static_cast<double>(rg_()), static_cast<double>(rg_.max())); } void DynamicRandomAP::reset() { startupTime_ = getSteadyClockSeconds(); params_.updateTime = startupTime_; params_.probabilityFactor = probabilitySeed_; params_.curTargetRate = 0; params_.bytesWrittenLastUpdate = fnBytesWritten_(); } void DynamicRandomAP::update() { std::unique_lock<folly::SharedMutex> lock{mutex_}; updateThrottleParams(getSteadyClockSeconds()); } void DynamicRandomAP::updateThrottleParams(std::chrono::seconds curTime) { constexpr uint64_t kSecondsInDay{3600 * 24}; auto updateTimeDelta = (curTime - params_.updateTime).count(); // in case this is the first update, or we are in unit test where curTime is // set arbitrarily. if (updateTimeDelta <= 0) { updateTimeDelta = updateInterval_.count(); } params_.updateTime = curTime; auto bytesWritten = fnBytesWritten_(); params_.observedCurRate_ = (bytesWritten - params_.bytesWrittenLastUpdate) / updateTimeDelta; if (params_.observedCurRate_ == 0) { return; } auto secondsElapsed = (curTime - startupTime_).count(); auto targetWrittenTomorrow = targetRate_ * (secondsElapsed + kSecondsInDay); uint64_t curTargetRate{0}; if (bytesWritten < targetWrittenTomorrow) { // Write rate needed to achieve @targetWrittenTomorrow given that we have // currently written @bytesWritten. curTargetRate = (targetWrittenTomorrow - bytesWritten) / kSecondsInDay; } if (maxRate_ > 0 && curTargetRate > maxRate_) { XLOGF(INFO, "max write rate {} will be used because target current write rate {} " "exceeds it.", maxRate_, curTargetRate); curTargetRate = maxRate_; } params_.curTargetRate = curTargetRate; auto rawProbFactor = fdiv(static_cast<double>(curTargetRate), static_cast<double>(params_.observedCurRate_)); params_.probabilityFactor = clampFactor(params_.probabilityFactor * clampFactorChange(rawProbFactor)); XLOG_EVERY_MS(INFO, 60000) << "observed current write rate = " << params_.observedCurRate_ << ", target current rate = " << curTargetRate << " rawProbFactor = " << rawProbFactor << ", probFactor = " << params_.probabilityFactor; params_.bytesWrittenLastUpdate = bytesWritten; } double DynamicRandomAP::clampFactorChange(double change) const { return clamp(change, minChange_, maxChange_); } double DynamicRandomAP::clampFactor(double factor) const { return clamp(factor, lowerBound_, upperBound_); } double DynamicRandomAP::getBaseProbability(uint64_t size) const { // Get index of most significant bit. This buckets the item sizes. uint32_t log2Size = folly::findLastSet(size); return std::pow(fdiv(baseProbabilityMultiplier_, log2Size), baseProbabilityExponent_); } DynamicRandomAP::ThrottleParams DynamicRandomAP::getThrottleParams() const { std::shared_lock<folly::SharedMutex> lock{mutex_}; return params_; } void DynamicRandomAP::getCounters(const CounterVisitor& visitor) const { auto params = getThrottleParams(); visitor("navy_ap_write_rate_target_configured", static_cast<double>(targetRate_)); visitor("navy_ap_write_rate_max_configured", static_cast<double>(maxRate_)); visitor("navy_ap_write_rate_adjusted_target", static_cast<double>(params.curTargetRate)); visitor("navy_ap_prob_factor_x100", params_.probabilityFactor * 100); baseProbStats_.visitQuantileEstimator(visitor, "navy_ap_baseProx_x100"); visitor("navy_ap_write_rate_observed", static_cast<double>(params_.observedCurRate_)); } } // namespace navy } // namespace cachelib } // namespace facebook