cachelib/navy/block_cache/RegionManager.cpp (402 lines of code) (raw):

/* * Copyright (c) Facebook, Inc. and its affiliates. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ #include "cachelib/navy/block_cache/RegionManager.h" #include "cachelib/navy/common/Utils.h" #include "cachelib/navy/scheduler/JobScheduler.h" namespace facebook { namespace cachelib { namespace navy { RegionManager::RegionManager(uint32_t numRegions, uint64_t regionSize, uint64_t baseOffset, Device& device, uint32_t numCleanRegions, JobScheduler& scheduler, RegionEvictCallback evictCb, RegionCleanupCallback cleanupCb, std::unique_ptr<EvictionPolicy> policy, uint32_t numInMemBuffers, uint16_t numPriorities, uint16_t inMemBufFlushRetryLimit) : numPriorities_{numPriorities}, inMemBufFlushRetryLimit_{inMemBufFlushRetryLimit}, numRegions_{numRegions}, regionSize_{regionSize}, baseOffset_{baseOffset}, device_{device}, policy_{std::move(policy)}, regions_{std::make_unique<std::unique_ptr<Region>[]>(numRegions)}, numCleanRegions_{numCleanRegions}, scheduler_{scheduler}, evictCb_{evictCb}, cleanupCb_{cleanupCb}, numInMemBuffers_{numInMemBuffers} { XLOGF(INFO, "{} regions, {} bytes each", numRegions_, regionSize_); for (uint32_t i = 0; i < numRegions; i++) { regions_[i] = std::make_unique<Region>(RegionId{i}, regionSize_); } XDCHECK_LT(0u, numInMemBuffers_); for (uint32_t i = 0; i < numInMemBuffers_; i++) { buffers_.push_back( std::make_unique<Buffer>(device.makeIOBuffer(regionSize_))); } resetEvictionPolicy(); } RegionId RegionManager::evict() { auto rid = policy_->evict(); if (!rid.valid()) { XLOG(ERR, "Eviction failed"); } else { XLOGF(DBG, "Evict {}", rid.index()); } return rid; } void RegionManager::touch(RegionId rid) { auto& region = getRegion(rid); XDCHECK_EQ(rid, region.id()); if (!region.hasBuffer()) { policy_->touch(rid); } } void RegionManager::track(RegionId rid) { auto& region = getRegion(rid); XDCHECK_EQ(rid, region.id()); policy_->track(region); } void RegionManager::reset() { for (uint32_t i = 0; i < numRegions_; i++) { regions_[i]->reset(); } { std::lock_guard<std::mutex> lock{cleanRegionsMutex_}; // Reset is inherently single threaded. All pending jobs, including // reclaims, have to be finished first. XDCHECK_EQ(reclaimsScheduled_, 0u); cleanRegions_.clear(); } seqNumber_.store(0, std::memory_order_release); // Reset eviction policy resetEvictionPolicy(); } Region::FlushRes RegionManager::flushBuffer(const RegionId& rid) { auto& region = getRegion(rid); auto callBack = [this](RelAddress addr, BufferView view) { auto writeBuffer = device_.makeIOBuffer(view.size()); writeBuffer.copyFrom(0, view); if (!deviceWrite(addr, std::move(writeBuffer))) { return false; } numInMemBufWaitingFlush_.dec(); return true; }; // This is no-op if the buffer is already flushed return region.flushBuffer(std::move(callBack)); } bool RegionManager::detachBuffer(const RegionId& rid) { auto& region = getRegion(rid); // detach buffer can return nullptr if there are active readers auto buf = region.detachBuffer(); if (!buf) { return false; } returnBufferToPool(std::move(buf)); return true; } bool RegionManager::cleanupBufferOnFlushFailure(const RegionId& regionId) { auto& region = getRegion(regionId); auto callBack = [this](RegionId rid, BufferView buffer) { cleanupCb_(rid, buffer); numInMemBufWaitingFlush_.dec(); numInMemBufFlushFailures_.inc(); }; // This is no-op if the buffer is already cleaned up. if (!region.cleanupBuffer(std::move(callBack))) { return false; } return detachBuffer(regionId); } void RegionManager::releaseCleanedupRegion(RegionId rid) { auto& region = getRegion(rid); // Subtract the wasted bytes in the end externalFragmentation_.sub(getRegion(rid).getFragmentationSize()); // Full barrier because we cannot have seqNumber_.fetch_add() re-ordered // below region.reset(). It is similar to the full barrier in openForRead. seqNumber_.fetch_add(1, std::memory_order_acq_rel); // Reset all region internal state, making it ready to be // used by a region allocator. region.reset(); { std::lock_guard<std::mutex> lock{cleanRegionsMutex_}; cleanRegions_.push_back(rid); } } OpenStatus RegionManager::assignBufferToRegion(RegionId rid) { XDCHECK(rid.valid()); auto buf = claimBufferFromPool(); if (!buf) { return OpenStatus::Retry; } auto& region = getRegion(rid); region.attachBuffer(std::move(buf)); return OpenStatus::Ready; } std::unique_ptr<Buffer> RegionManager::claimBufferFromPool() { std::unique_ptr<Buffer> buf; { std::lock_guard<std::mutex> bufLock{bufferMutex_}; if (buffers_.empty()) { return nullptr; } buf = std::move(buffers_.back()); buffers_.pop_back(); } numInMemBufActive_.inc(); return buf; } OpenStatus RegionManager::getCleanRegion(RegionId& rid) { auto status = OpenStatus::Retry; uint32_t newSched = 0; { std::lock_guard<std::mutex> lock{cleanRegionsMutex_}; if (!cleanRegions_.empty()) { rid = cleanRegions_.back(); cleanRegions_.pop_back(); status = OpenStatus::Ready; } else { status = OpenStatus::Retry; } auto plannedClean = cleanRegions_.size() + reclaimsScheduled_; if (plannedClean < numCleanRegions_) { newSched = numCleanRegions_ - plannedClean; reclaimsScheduled_ += newSched; } } for (uint32_t i = 0; i < newSched; i++) { scheduler_.enqueue( [this] { return startReclaim(); }, "reclaim", JobType::Reclaim); } if (status == OpenStatus::Ready) { status = assignBufferToRegion(rid); if (status != OpenStatus::Ready) { std::lock_guard<std::mutex> lock{cleanRegionsMutex_}; cleanRegions_.push_back(rid); } } return status; } void RegionManager::doFlush(RegionId rid, bool async) { // We're wasting the remaining bytes of a region, so track it for stats externalFragmentation_.add(getRegion(rid).getFragmentationSize()); getRegion(rid).setPendingFlush(); numInMemBufWaitingFlush_.inc(); Job flushJob = [this, rid, retryAttempts = 0, flushed = false]() mutable { if (!flushed) { if (retryAttempts >= inMemBufFlushRetryLimit_) { // Flush failure reaches retry limit, stop flushing and start to // clean up the buffer. if (cleanupBufferOnFlushFailure(rid)) { releaseCleanedupRegion(rid); return JobExitCode::Done; } numInMemBufCleanupRetries_.inc(); return JobExitCode::Reschedule; } auto res = flushBuffer(rid); if (res == Region::FlushRes::kSuccess) { flushed = true; } else { // We have a limited retry limit for flush errors due to device if (res == Region::FlushRes::kRetryDeviceFailure) { retryAttempts++; numInMemBufFlushRetries_.inc(); } return JobExitCode::Reschedule; } } // If the buffer has been successfully flushed or the current flush // succeeds, detach the buffer until it succeeds if (flushed) { if (detachBuffer(rid)) { // Flush completed, track the region track(rid); return JobExitCode::Done; } } return JobExitCode::Reschedule; }; if (async) { scheduler_.enqueue(std::move(flushJob), "flush", JobType::Flush); } else { while (flushJob() == JobExitCode::Reschedule) { // We intentionally sleep here to slow it down since this is only // triggered on shutdown. On cleanup failures, we will sleep a bit before // retrying to avoid maxing out cpu. /* sleep override */ std::this_thread::sleep_for(std::chrono::milliseconds{100}); } } } JobExitCode RegionManager::startReclaim() { auto rid = evict(); if (!rid.valid()) { return JobExitCode::Reschedule; } scheduler_.enqueue( [this, rid] { const auto startTime = getSteadyClock(); auto& region = getRegion(rid); if (!region.readyForReclaim()) { // Once a region is set exclusive, all future accesses will be // blocked. However there might still be accesses in-flight, // so we would retry if that's the case. return JobExitCode::Reschedule; } // We know now we're the only thread working with this region. // Hence, it's safe to access @Region without lock. if (region.getNumItems() != 0) { XDCHECK(!region.hasBuffer()); auto desc = RegionDescriptor::makeReadDescriptor( OpenStatus::Ready, RegionId{rid}, true /* physRead */); auto sizeToRead = region.getLastEntryEndOffset(); auto buffer = read(desc, RelAddress{rid, 0}, sizeToRead); if (buffer.size() != sizeToRead) { // TODO: remove when we fix T95777575 XLOGF(ERR, "Failed to read region {} during reclaim. Region size to " "read: {}, Actually read: {}", rid.index(), sizeToRead, buffer.size()); reclaimRegionErrors_.inc(); } else { doEviction(rid, buffer.view()); } } releaseEvictedRegion(rid, startTime); return JobExitCode::Done; }, "reclaim.evict", JobType::Reclaim); return JobExitCode::Done; } RegionDescriptor RegionManager::openForRead(RegionId rid, uint64_t seqNumber) { auto& region = getRegion(rid); auto desc = region.openForRead(); if (!desc.isReady()) { return desc; } // << Interaction of Region Lock and Sequence Number >> // // Reader: // 1r. Load seq number // 2r. Check index // 3r. Open region // 4r. Load seq number // If hasn't changed, proceed to read and close region. // Otherwise, abort read and close region. // // Reclaim: // 1x. Mark region ready for reclaim // 2x. Reclaim and evict entries from index // 3x. Store seq number // 4x. Reset region // // In order for these two sequence of operations to not have data race, // we must guarantee the following ordering: // 3r -> 4r // // We know that 3r either happens before 1x or happens after 4x, this // means with the above ordering, 4r will either: // 1. Read the same seq number and proceed to read // (3r -> 4r -> (read item and close region) -> 1x) // 2. Or, read a different seq number and abort read (4x -> 3r -> 4r) // Either of the above is CORRECT operation. // // 3r has mutex::lock() at the beginning so, it prevents 4r from being // reordered above it. // // We also need to ensure 3x is not re-ordered below 4x. This is handled // by a acq_rel memory order in 3x. See releaseEvictedRegion() for details. // // Finally, 4r has acquire semantic which will sychronizes-with 3x's acq_rel. if (seqNumber_.load(std::memory_order_acquire) != seqNumber) { region.close(std::move(desc)); return RegionDescriptor{OpenStatus::Retry}; } return desc; } void RegionManager::close(RegionDescriptor&& desc) { RegionId rid = desc.id(); auto& region = getRegion(rid); region.close(std::move(desc)); } void RegionManager::releaseEvictedRegion(RegionId rid, std::chrono::nanoseconds startTime) { auto& region = getRegion(rid); // Subtract the wasted bytes in the end since we're reclaiming this region now externalFragmentation_.sub(getRegion(rid).getFragmentationSize()); // Full barrier because we cannot have seqNumber_.fetch_add() re-ordered // below region.reset(). If it is re-ordered then, we can end up with a data // race where a read returns stale data. See openForRead() for details. seqNumber_.fetch_add(1, std::memory_order_acq_rel); // Reset all region internal state, making it ready to be // used by a region allocator. region.reset(); { std::lock_guard<std::mutex> lock{cleanRegionsMutex_}; reclaimsScheduled_--; cleanRegions_.push_back(rid); } reclaimTimeCountUs_.add(toMicros(getSteadyClock() - startTime).count()); reclaimCount_.inc(); } void RegionManager::doEviction(RegionId rid, BufferView buffer) const { if (buffer.isNull()) { XLOGF(ERR, "Error reading region {} on reclamation", rid.index()); } else { const auto evictStartTime = getSteadyClock(); XLOGF(DBG, "Evict region {} entries", rid.index()); auto numEvicted = evictCb_(rid, buffer); XLOGF(DBG, "Evict region {} entries: {} us", rid.index(), toMicros(getSteadyClock() - evictStartTime).count()); evictedCount_.add(numEvicted); } } void RegionManager::persist(RecordWriter& rw) const { serialization::RegionData regionData; *regionData.regionSize_ref() = regionSize_; regionData.regions_ref()->resize(numRegions_); for (uint32_t i = 0; i < numRegions_; i++) { auto& regionProto = regionData.regions_ref()[i]; *regionProto.regionId_ref() = i; *regionProto.lastEntryEndOffset_ref() = regions_[i]->getLastEntryEndOffset(); regionProto.priority_ref() = regions_[i]->getPriority(); *regionProto.numItems_ref() = regions_[i]->getNumItems(); } serializeProto(regionData, rw); } void RegionManager::recover(RecordReader& rr) { auto regionData = deserializeProto<serialization::RegionData>(rr); if (regionData.regions_ref()->size() != numRegions_ || static_cast<uint32_t>(*regionData.regionSize_ref()) != regionSize_) { throw std::invalid_argument( "Could not recover RegionManager. Invalid RegionData."); } for (auto& regionProto : *regionData.regions_ref()) { uint32_t index = *regionProto.regionId_ref(); if (index >= numRegions_ || static_cast<uint32_t>(*regionProto.lastEntryEndOffset_ref()) > regionSize_) { throw std::invalid_argument( "Could not recover RegionManager. Invalid RegionId."); } // To handle compatibility between different priorities. If the current // setup has fewer priorities than the last run, automatically downgrade // all higher priorties to the current max. if (numPriorities_ > 0 && regionProto.priority_ref() >= numPriorities_) { regionProto.priority_ref() = numPriorities_ - 1; } regions_[index] = std::make_unique<Region>(regionProto, *regionData.regionSize_ref()); } // Reset policy and reinitialize it per the recovered state resetEvictionPolicy(); } void RegionManager::resetEvictionPolicy() { XDCHECK_GT(numRegions_, 0u); policy_->reset(); externalFragmentation_.set(0); // Go through all the regions, restore fragmentation size, and track all empty // regions for (uint32_t i = 0; i < numRegions_; i++) { externalFragmentation_.add(regions_[i]->getFragmentationSize()); if (regions_[i]->getNumItems() == 0) { track(RegionId{i}); } } // Now track all non-empty regions. This should ensure empty regions are // pushed to the bottom for both LRU and FIFO policies. for (uint32_t i = 0; i < numRegions_; i++) { if (regions_[i]->getNumItems() != 0) { track(RegionId{i}); } } } bool RegionManager::isValidIORange(uint32_t offset, uint32_t size) const { return static_cast<uint64_t>(offset) + size <= regionSize_; } bool RegionManager::deviceWrite(RelAddress addr, Buffer buf) { const auto bufSize = buf.size(); XDCHECK(isValidIORange(addr.offset(), bufSize)); auto physOffset = physicalOffset(addr); if (!device_.write(physOffset, std::move(buf))) { return false; } physicalWrittenCount_.add(bufSize); return true; } void RegionManager::write(RelAddress addr, Buffer buf) { auto rid = addr.rid(); auto& region = getRegion(rid); region.writeToBuffer(addr.offset(), buf.view()); } Buffer RegionManager::read(const RegionDescriptor& desc, RelAddress addr, size_t size) const { auto rid = addr.rid(); auto& region = getRegion(rid); // Do not expect to read beyond what was already written XDCHECK_LE(addr.offset() + size, region.getLastEntryEndOffset()); if (!desc.isPhysReadMode()) { auto buffer = Buffer(size); XDCHECK(region.hasBuffer()); region.readFromBuffer(addr.offset(), buffer.mutableView()); return buffer; } XDCHECK(isValidIORange(addr.offset(), size)); return device_.read(physicalOffset(addr), size); } void RegionManager::flush() { device_.flush(); } void RegionManager::getCounters(const CounterVisitor& visitor) const { visitor("navy_bc_reclaim", reclaimCount_.get()); visitor("navy_bc_reclaim_time", reclaimTimeCountUs_.get()); visitor("navy_bc_region_reclaim_errors", reclaimRegionErrors_.get()); visitor("navy_bc_evicted", evictedCount_.get()); visitor("navy_bc_num_regions", numRegions_); visitor("navy_bc_num_clean_regions", cleanRegions_.size()); visitor("navy_bc_external_fragmentation", externalFragmentation_.get()); visitor("navy_bc_physical_written", physicalWrittenCount_.get()); visitor("navy_bc_inmem_active", numInMemBufActive_.get()); visitor("navy_bc_inmem_waiting_flush", numInMemBufWaitingFlush_.get()); visitor("navy_bc_inmem_flush_retries", numInMemBufFlushRetries_.get()); visitor("navy_bc_inmem_flush_failures", numInMemBufFlushFailures_.get()); visitor("navy_bc_inmem_cleanup_retries", numInMemBufCleanupRetries_.get()); policy_->getCounters(visitor); } } // namespace navy } // namespace cachelib } // namespace facebook