cachelib/allocator/MemoryMonitor.cpp (255 lines of code) (raw):

/* * Copyright (c) Facebook, Inc. and its affiliates. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ #include "cachelib/allocator/MemoryMonitor.h" #include <folly/logging/xlog.h> #include "cachelib/allocator/PoolResizeStrategy.h" #include "cachelib/common/Exceptions.h" namespace facebook { namespace cachelib { constexpr size_t kGBytes = 1024 * 1024 * 1024; MemoryMonitor::MemoryMonitor(CacheBase& cache, Config config, std::shared_ptr<RebalanceStrategy> strategy) : cache_(cache), mode_(config.mode), strategy_(std::move(strategy)), percentAdvisePerIteration_(config.maxAdvisePercentPerIter), percentReclaimPerIteration_(config.maxReclaimPercentPerIter), lowerLimit_(config.lowerLimitGB * kGBytes), upperLimit_(config.upperLimitGB * kGBytes), maxLimitPercent_(config.maxAdvisePercent), reclaimRateLimitWindowSecs_(config.reclaimRateLimitWindowSecs), rateLimiter_( // Detect rate of decrease in free memory and // rate of increase in resident memory mode config.mode == FreeMemory ? false : true) { if (!strategy_) { strategy_ = std::make_shared<PoolResizeStrategy>(); } // There should be at least a slab worth of difference between upper // and lower memory limits. XDCHECK_LT(lowerLimit_, upperLimit_ - Slab::kSize); } MemoryMonitor::~MemoryMonitor() { try { stop(); } catch (const std::exception&) { } } void MemoryMonitor::work() { // Poll interval can change. Keep rate limiter window size updated. rateLimiter_.setWindowSize( reclaimRateLimitWindowSecs_.count() / std::chrono::duration_cast<std::chrono::seconds>(getInterval()).count()); switch (mode_) { case FreeMemory: checkFreeMemory(); break; case ResidentMemory: checkResidentMemory(); break; case TestMode: checkPoolsAndAdviseReclaim(); break; default: throw std::runtime_error("Unsupported memory monitoring mode"); } } void MemoryMonitor::checkFreeMemory() { auto memFree = facebook::cachelib::util::getMemAvailable(); memAvailableSize_ = memFree; rateLimiter_.addValue(memFree); const auto stats = cache_.getCacheMemoryStats(); if (memFree < lowerLimit_) { XLOGF(DBG, "Free memory size of {} bytes is below the limit of {} bytes", memFree, lowerLimit_); adviseAwaySlabs(); } else if (memFree > upperLimit_ && stats.numAdvisedSlabs() > 0) { XLOGF(DBG, "Free memory size of {} bytes is above the limit of {} bytes", memFree, upperLimit_); reclaimSlabs(); } checkPoolsAndAdviseReclaim(); } void MemoryMonitor::checkResidentMemory() { auto rss = static_cast<size_t>(facebook::cachelib::util::getRSSBytes()); memRssSize_ = rss; rateLimiter_.addValue(rss); const auto stats = cache_.getCacheMemoryStats(); if (rss > upperLimit_) { XLOGF(DBG, "Resident memory size of {} bytes is above the limit of {} bytes", rss, upperLimit_); adviseAwaySlabs(); } else if (rss < lowerLimit_ && stats.numAdvisedSlabs() > 0) { XLOGF(DBG, "Resident memory size of {} bytes is below the limit of {} bytes", rss, lowerLimit_); reclaimSlabs(); } checkPoolsAndAdviseReclaim(); } namespace { size_t bytesToSlabs(size_t bytes) { return bytes / Slab::kSize; } } // namespace size_t MemoryMonitor::getPoolUsedSlabs(PoolId poolId) const noexcept { return bytesToSlabs(cache_.getPool(poolId).getCurrentUsedSize()); } size_t MemoryMonitor::getPoolSlabs(PoolId poolId) const noexcept { return bytesToSlabs(cache_.getPool(poolId).getPoolUsableSize()); } size_t MemoryMonitor::getTotalSlabs() const noexcept { const auto pools = cache_.getRegularPoolIds(); return std::accumulate(pools.begin(), pools.end(), 0ull, [this](auto total, const auto& poolId) { return total + getPoolSlabs(poolId); }); } size_t MemoryMonitor::getSlabsInUse() const noexcept { const auto pools = cache_.getRegularPoolIds(); return std::accumulate(pools.begin(), pools.end(), 0ull, [this](auto total, const auto& poolId) { return total + getPoolUsedSlabs(poolId); }); } void MemoryMonitor::checkPoolsAndAdviseReclaim() { auto results = cache_.calcNumSlabsToAdviseReclaim(); if (results.poolAdviseReclaimMap.empty()) { return; } // all result would either be advise or reclaim. It is not possible for // some of them to be advise and some to reclaim // Advise slabs, if marked for advise if (results.advise) { for (auto& result : results.poolAdviseReclaimMap) { uint64_t slabsAdvised = 0; PoolId poolId = result.first; uint64_t slabsToAdvise = result.second; while (slabsAdvised < slabsToAdvise) { const auto classId = strategy_->pickVictimForResizing(cache_, poolId); if (classId == Slab::kInvalidClassId) { break; } try { const auto now = util::getCurrentTimeMs(); auto stats = cache_.getPoolStats(poolId); cache_.releaseSlab(poolId, classId, SlabReleaseMode::kAdvise); ++slabsAdvised; const auto elapsed_time = static_cast<uint64_t>(util::getCurrentTimeMs() - now); stats.numSlabsForClass(classId); stats.evictionAgeForClass(classId); // Log the event about the Pool which released the Slab along with // the number of slabs. stats_.addSlabReleaseEvent( classId, Slab::kInvalidClassId, /* No Class info */ elapsed_time, poolId, stats.numSlabsForClass(classId), 0 /* receiver slabs */, stats.allocSizeForClass(classId), 0 /* receiver alloc size */, stats.evictionAgeForClass(classId), 0 /* receiver eviction age */, stats.numFreeAllocsForClass(classId)); } catch (const exception::SlabReleaseAborted& e) { XLOGF(WARN, "Aborted trying to advise away a slab from pool {} for" " allocation class {}. Error: {}", static_cast<int>(poolId), static_cast<int>(classId), e.what()); return; } catch (const std::exception& e) { XLOGF( CRITICAL, "Error trying to advise away a slab from pool {} for allocation " "class {}. Error: {}", static_cast<int>(poolId), static_cast<int>(classId), e.what()); } } slabsAdvised_ += slabsAdvised; XLOGF(DBG, "Advised away {} slabs from Pool ID: {}, to free {} bytes", slabsAdvised, static_cast<int>(poolId), slabsAdvised * Slab::kSize); } return; } else { XDCHECK(!results.advise); // Reclaim slabs, if marked for reclaim for (auto& result : results.poolAdviseReclaimMap) { PoolId poolId = result.first; uint64_t slabsToReclaim = result.second; auto slabsReclaimed = cache_.reclaimSlabs(poolId, slabsToReclaim); XLOGF( DBG, "Reclaimed {} of {} slabs for Pool ID: {}, to grow cache by {} bytes", slabsReclaimed, slabsToReclaim, static_cast<int>(poolId), slabsReclaimed * Slab::kSize); slabsReclaimed_ += slabsReclaimed; } } } void MemoryMonitor::adviseAwaySlabs() { const auto totalSlabsInUse = getSlabsInUse(); const auto totalSlabs = getTotalSlabs(); if (totalSlabsInUse == 0 || totalSlabs == 0) { // If there are no used slabs and we're still having to advise away, then // the cache size is too big! XLOG(DBG, "There are no slabs in use to advise away"); return; } const auto numAdvised = cache_.getCacheMemoryStats().numAdvisedSlabs(); const auto advisedPercent = numAdvised * 100 / (numAdvised + totalSlabs); if (advisedPercent > maxLimitPercent_) { XLOGF(CRITICAL, "More than {} slabs of {} ({}" "%) in the item cache memory have been advised away. " "This exceeds the maximum limit of {}" "%. Disabling advising which may result in an OOM.", numAdvised, totalSlabs, advisedPercent, maxLimitPercent_); return; } // Advise percentAdvisePerIteration_% of upperLimit_ - lowerLimit_ // every iteration const auto slabsToAdvise = bytesToSlabs(upperLimit_ - lowerLimit_) * percentAdvisePerIteration_ / 100; XLOGF(DBG, "Advising away {} slabs to free {} bytes", slabsToAdvise, slabsToAdvise * Slab::kSize); cache_.updateNumSlabsToAdvise(slabsToAdvise); } void MemoryMonitor::reclaimSlabs() { // Reclaim percentReclaimPerIteration_% of upperLimit_ - lowerLimit_ // every iteration const auto reclaimBytes = (upperLimit_ - lowerLimit_) * percentReclaimPerIteration_ / 100; // Rate limit reclaimed memory if free memory is dropping or rss is rising // to prevent OOM const auto rateLimitedReclaimBytes = rateLimiter_.throttle(reclaimBytes); if (reclaimBytes > rateLimitedReclaimBytes) { XLOGF(DBG, "Rate limiting reclaim down from {} bytes to {} bytes", reclaimBytes, rateLimitedReclaimBytes); } auto slabsToReclaim = bytesToSlabs(rateLimitedReclaimBytes); const auto stats = cache_.getCacheMemoryStats(); if (slabsToReclaim > stats.numAdvisedSlabs()) { slabsToReclaim = stats.numAdvisedSlabs(); } if (slabsToReclaim == 0) { return; } const auto totalSlabsInUse = getSlabsInUse(); if (totalSlabsInUse == 0) { XLOG(CRITICAL, "There are no slabs in use by items cache, cannot reclaim"); return; } XLOGF(DBG, "Reclaiming {} slabs to increase cache size by {} bytes", slabsToReclaim, slabsToReclaim * Slab::kSize); cache_.updateNumSlabsToAdvise(-slabsToReclaim); } RateLimiter::RateLimiter(bool detectIncrease) : detectIncrease_(detectIncrease) {} void RateLimiter::addValue(int64_t value) { if (windowSize_ < 2) { // Window size not large enough to calculate rate of change. // This effectively disables rate limiting. return; } values_.push_back(value); auto prevValue = values_.front(); // We may remove multiple values if window size shrinks while (values_.size() > windowSize_) { values_.pop_front(); } if (detectIncrease_) { rateOfChange_ = (value - prevValue) / static_cast<int64_t>(windowSize_); } else { rateOfChange_ = (prevValue - value) / static_cast<int64_t>(windowSize_); } } size_t RateLimiter::throttle(int64_t delta) { if (rateOfChange_ < 0 || windowSize_ < 2) { return delta; // No throttling } // Fully throttled when we either have insufficient number of samples or // rate of change is faster than proposed delta change. if (values_.size() < windowSize_ || delta < rateOfChange_) { return 0; } // Throttle down delta by rate of change. The greater the rate of change, the // more the delta is throttled. return delta - rateOfChange_; } } // namespace cachelib } // namespace facebook