common/protobuf/kudu/util/file_cache.cc (578 lines of code) (raw):

// Licensed to the Apache Software Foundation (ASF) under one // or more contributor license agreements. See the NOTICE file // distributed with this work for additional information // regarding copyright ownership. The ASF licenses this file // to you under the Apache License, Version 2.0 (the // "License"); you may not use this file except in compliance // with the License. You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, // software distributed under the License is distributed on an // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. #include "kudu/util/file_cache.h" #include <atomic> #include <cstdint> #include <cstring> #include <functional> #include <memory> #include <mutex> #include <ostream> #include <string> #include <utility> #include <vector> #include <gflags/gflags.h> #include <glog/logging.h> #include "kudu/gutil/macros.h" #include "kudu/gutil/map-util.h" #include "kudu/gutil/ref_counted.h" #include "kudu/gutil/strings/substitute.h" #include "kudu/util/array_view.h" #include "kudu/util/cache.h" #include "kudu/util/cache_metrics.h" #include "kudu/util/countdown_latch.h" #include "kudu/util/env.h" #include "kudu/util/file_cache_metrics.h" #include "kudu/util/flag_tags.h" #include "kudu/util/locks.h" #include "kudu/util/monotime.h" #include "kudu/util/once.h" #include "kudu/util/slice.h" #include "kudu/util/status.h" #include "kudu/util/thread.h" DEFINE_int32(file_cache_expiry_period_ms, 60 * 1000, "Period of time (in ms) between removing expired file cache descriptors"); TAG_FLAG(file_cache_expiry_period_ms, advanced); using std::shared_ptr; using std::string; using std::unique_ptr; using std::vector; using strings::Substitute; namespace kudu { namespace { template <class FileType> FileType* CacheValueToFileType(Slice s) { return reinterpret_cast<FileType*>(*reinterpret_cast<void**>( s.mutable_data())); } class EvictionCallback : public Cache::EvictionCallback { public: EvictionCallback() {} void EvictedEntry(Slice key, Slice value) override { VLOG(2) << "Evicted fd belonging to " << key.ToString(); delete CacheValueToFileType<File>(value); } private: DISALLOW_COPY_AND_ASSIGN(EvictionCallback); }; } // anonymous namespace namespace internal { template <class FileType> class ScopedOpenedDescriptor; // Encapsulates common descriptor fields and methods. template <class FileType> class BaseDescriptor { public: BaseDescriptor(FileCache* file_cache, string filename) : file_cache_(file_cache), file_name_(std::move(filename)) {} ~BaseDescriptor() { VLOG(2) << "Out of scope descriptor with file name: " << filename(); // The destruction of the descriptor indicates that there's no active user // of the file at the moment. However, if the fd is still open in the LRU // cache, should we leave it there, or should we evict it? // // We opt for eviction: your typical filesystem user is more likely to want // the underlying resource released when they're done working with a file // than they are to want the resource to be quickly accessible should the // file be reopened. cache()->Erase(filename()); if (deleted()) { VLOG(1) << "Deleting file: " << filename(); WARN_NOT_OK(env()->DeleteFile(filename()), ""); } // The (now expired) weak_ptr remains in 'descriptors_', to be removed by // the next call to RunDescriptorExpiry(). Removing it here would risk a // deadlock on recursive acquisition of 'lock_'. } // Insert a pointer to an open file object into the file cache with the // filename as the cache key. // // Returns a handle to the inserted entry. The handle always contains an open // file. ScopedOpenedDescriptor<FileType> InsertIntoCache(void* file_ptr) const { // The allocated charge is always one byte. This is incorrect with respect // to memory tracking, but it's necessary if the cache capacity is to be // equivalent to the max number of fds. auto pending(cache()->Allocate(filename(), sizeof(file_ptr), 1)); CHECK(pending); memcpy(cache()->MutableValue(&pending), &file_ptr, sizeof(file_ptr)); return ScopedOpenedDescriptor<FileType>( this, cache()->Insert(std::move(pending), file_cache_->eviction_cb_.get())); } // Retrieves a pointer to an open file object from the file cache with the // filename as the cache key. // // Returns a handle to the looked up entry. The handle may or may not contain // an open file, depending on whether the cache hit or missed. ScopedOpenedDescriptor<FileType> LookupFromCache() const { return ScopedOpenedDescriptor<FileType>( this, cache()->Lookup(filename(), Cache::EXPECT_IN_CACHE)); } // Mark this descriptor as to-be-deleted later. void MarkDeleted() { DCHECK(!deleted()); while (true) { auto v = flags_.load(); if (flags_.compare_exchange_weak(v, v | FILE_DELETED)) return; } } // Mark this descriptor as invalidated. No further access is allowed // to this file. void MarkInvalidated() { DCHECK(!invalidated()); while (true) { auto v = flags_.load(); if (flags_.compare_exchange_weak(v, v | INVALIDATED)) return; } } Cache* cache() const { return file_cache_->cache_.get(); } Env* env() const { return file_cache_->env_; } const string& filename() const { return file_name_; } bool deleted() const { return flags_.load() & FILE_DELETED; } bool invalidated() const { return flags_.load() & INVALIDATED; } private: FileCache* file_cache_; const string file_name_; enum Flags { FILE_DELETED = 1 << 0, INVALIDATED = 1 << 1 }; std::atomic<uint8_t> flags_ {0}; DISALLOW_COPY_AND_ASSIGN(BaseDescriptor); }; // A "smart" retrieved LRU cache handle. // // The cache handle is released when this object goes out of scope, possibly // closing the opened file if it is no longer in the cache. template <class FileType> class ScopedOpenedDescriptor { public: // A not-yet-but-soon-to-be opened descriptor. explicit ScopedOpenedDescriptor(const BaseDescriptor<FileType>* desc) : desc_(desc), handle_(nullptr, Cache::HandleDeleter(desc_->cache())) { } // An opened descriptor. Its handle may or may not contain an open file. ScopedOpenedDescriptor(const BaseDescriptor<FileType>* desc, Cache::UniqueHandle handle) : desc_(desc), handle_(std::move(handle)) { } bool opened() const { return handle_.get(); } FileType* file() const { DCHECK(opened()); return CacheValueToFileType<FileType>(desc_->cache()->Value(handle_)); } private: const BaseDescriptor<FileType>* desc_; Cache::UniqueHandle handle_; }; // Reference to an on-disk file that may or may not be opened (and thus // cached) in the file cache. // // This empty template is just a specification; actual descriptor classes must // be fully specialized. template <class FileType> class Descriptor : public FileType { }; // A descriptor adhering to the RWFile interface (i.e. when opened, provides // a read-write interface to the underlying file). template <> class Descriptor<RWFile> : public RWFile { public: Descriptor(FileCache* file_cache, const string& filename) : base_(file_cache, filename) {} ~Descriptor() = default; Status Read(uint64_t offset, Slice result) const override { ScopedOpenedDescriptor<RWFile> opened(&base_); RETURN_NOT_OK(ReopenFileIfNecessary<Env::MUST_EXIST>(&opened)); return opened.file()->Read(offset, result); } Status ReadV(uint64_t offset, ArrayView<Slice> results) const override { ScopedOpenedDescriptor<RWFile> opened(&base_); RETURN_NOT_OK(ReopenFileIfNecessary<Env::MUST_EXIST>(&opened)); return opened.file()->ReadV(offset, results); } Status Write(uint64_t offset, const Slice& data) override { ScopedOpenedDescriptor<RWFile> opened(&base_); RETURN_NOT_OK(ReopenFileIfNecessary<Env::MUST_EXIST>(&opened)); return opened.file()->Write(offset, data); } Status WriteV(uint64_t offset, ArrayView<const Slice> data) override { ScopedOpenedDescriptor<RWFile> opened(&base_); RETURN_NOT_OK(ReopenFileIfNecessary<Env::MUST_EXIST>(&opened)); return opened.file()->WriteV(offset, data); } Status PreAllocate(uint64_t offset, size_t length, PreAllocateMode mode) override { ScopedOpenedDescriptor<RWFile> opened(&base_); RETURN_NOT_OK(ReopenFileIfNecessary<Env::MUST_EXIST>(&opened)); return opened.file()->PreAllocate(offset, length, mode); } Status Truncate(uint64_t length) override { ScopedOpenedDescriptor<RWFile> opened(&base_); RETURN_NOT_OK(ReopenFileIfNecessary<Env::MUST_EXIST>(&opened)); return opened.file()->Truncate(length); } Status PunchHole(uint64_t offset, size_t length) override { ScopedOpenedDescriptor<RWFile> opened(&base_); RETURN_NOT_OK(ReopenFileIfNecessary<Env::MUST_EXIST>(&opened)); return opened.file()->PunchHole(offset, length); } Status Flush(FlushMode mode, uint64_t offset, size_t length) override { ScopedOpenedDescriptor<RWFile> opened(&base_); RETURN_NOT_OK(ReopenFileIfNecessary<Env::MUST_EXIST>(&opened)); return opened.file()->Flush(mode, offset, length); } Status Sync() override { ScopedOpenedDescriptor<RWFile> opened(&base_); RETURN_NOT_OK(ReopenFileIfNecessary<Env::MUST_EXIST>(&opened)); return opened.file()->Sync(); } Status Close() override { // Intentional no-op; actual closing is deferred to LRU cache eviction. return Status::OK(); } Status Size(uint64_t* size) const override { ScopedOpenedDescriptor<RWFile> opened(&base_); RETURN_NOT_OK(ReopenFileIfNecessary<Env::MUST_EXIST>(&opened)); return opened.file()->Size(size); } Status GetExtentMap(ExtentMap* out) const override { ScopedOpenedDescriptor<RWFile> opened(&base_); RETURN_NOT_OK(ReopenFileIfNecessary<Env::MUST_EXIST>(&opened)); return opened.file()->GetExtentMap(out); } bool IsEncrypted() const override { return true; } const string& filename() const override { return base_.filename(); } size_t GetEncryptionHeaderSize() const override { return base_.env()->GetEncryptionHeaderSize(); } private: friend class ::kudu::FileCache; template <Env::OpenMode Mode> Status Init() { return once_.Init(&Descriptor<RWFile>::InitOnce<Mode>, this); } template <Env::OpenMode Mode> Status InitOnce() { return ReopenFileIfNecessary<Mode>(nullptr); } template <Env::OpenMode Mode> Status ReopenFileIfNecessary(ScopedOpenedDescriptor<RWFile>* out) const { ScopedOpenedDescriptor<RWFile> found(base_.LookupFromCache()); CHECK(!base_.invalidated()); if (found.opened()) { // The file is already open in the cache, return it. if (out) { *out = std::move(found); } return Status::OK(); } // The file was evicted, reopen it. RWFileOptions opts; opts.mode = Mode; opts.is_sensitive = true; unique_ptr<RWFile> f; RETURN_NOT_OK(base_.env()->NewRWFile(opts, base_.filename(), &f)); // The cache will take ownership of the newly opened file. ScopedOpenedDescriptor<RWFile> opened(base_.InsertIntoCache(f.release())); if (out) { *out = std::move(opened); } return Status::OK(); } BaseDescriptor<RWFile> base_; KuduOnceDynamic once_; DISALLOW_COPY_AND_ASSIGN(Descriptor); }; // A descriptor adhering to the RandomAccessFile interface (i.e. when opened, // provides a read-only interface to the underlying file). template <> class Descriptor<RandomAccessFile> : public RandomAccessFile { public: Descriptor(FileCache* file_cache, const string& filename) : base_(file_cache, filename) {} ~Descriptor() = default; Status Read(uint64_t offset, Slice result) const override { ScopedOpenedDescriptor<RandomAccessFile> opened(&base_); RETURN_NOT_OK(ReopenFileIfNecessary(&opened)); return opened.file()->Read(offset, result); } Status ReadV(uint64_t offset, ArrayView<Slice> results) const override { ScopedOpenedDescriptor<RandomAccessFile> opened(&base_); RETURN_NOT_OK(ReopenFileIfNecessary(&opened)); return opened.file()->ReadV(offset, results); } Status Size(uint64_t *size) const override { ScopedOpenedDescriptor<RandomAccessFile> opened(&base_); RETURN_NOT_OK(ReopenFileIfNecessary(&opened)); return opened.file()->Size(size); } const string& filename() const override { return base_.filename(); } size_t GetEncryptionHeaderSize() const override { return base_.env()->GetEncryptionHeaderSize(); } size_t memory_footprint() const override { // Normally we would use kudu_malloc_usable_size(this). However, that's // not safe because 'this' was allocated via std::make_shared(), which // means it isn't necessarily the base of the memory allocation; it may be // preceded by the shared_ptr control block. // // It doesn't appear possible to get the base of the allocation via any // shared_ptr APIs, so we'll use sizeof(*this) + 16 instead. The 16 bytes // represent the shared_ptr control block. Overall the object size is still // undercounted as it doesn't account for any internal heap fragmentation, // but at least it's safe. // // Some anecdotal memory measurements taken inside gdb: // - glibc 2.23 malloc_usable_size() on make_shared<FileType>: 88 bytes. // - tcmalloc malloc_usable_size() on make_shared<FileType>: 96 bytes. // - sizeof(std::_Sp_counted_base<>) with libstdc++ 5.4: 16 bytes. // - sizeof(std::__1::__shared_ptr_emplace<>) with libc++ 3.9: 16 bytes. // - sizeof(*this): 72 bytes. return sizeof(*this) + 16 + // shared_ptr control block once_.memory_footprint_excluding_this() + base_.filename().capacity(); } private: friend class ::kudu::FileCache; Status Init() { return once_.Init(&Descriptor<RandomAccessFile>::InitOnce, this); } Status InitOnce() { return ReopenFileIfNecessary(nullptr); } Status ReopenFileIfNecessary( ScopedOpenedDescriptor<RandomAccessFile>* out) const { ScopedOpenedDescriptor<RandomAccessFile> found(base_.LookupFromCache()); CHECK(!base_.invalidated()); if (found.opened()) { // The file is already open in the cache, return it. if (out) { *out = std::move(found); } return Status::OK(); } // The file was evicted, reopen it. unique_ptr<RandomAccessFile> f; RandomAccessFileOptions opts; opts.is_sensitive = true; RETURN_NOT_OK(base_.env()->NewRandomAccessFile(opts, base_.filename(), &f)); // The cache will take ownership of the newly opened file. ScopedOpenedDescriptor<RandomAccessFile> opened( base_.InsertIntoCache(f.release())); if (out) { *out = std::move(opened); } return Status::OK(); } BaseDescriptor<RandomAccessFile> base_; KuduOnceDynamic once_; DISALLOW_COPY_AND_ASSIGN(Descriptor); }; } // namespace internal const char* const FileCache::kAlreadyDeleted = "File already marked as deleted"; FileCache::FileCache(const string& cache_name, Env* env, int max_open_files, const scoped_refptr<MetricEntity>& entity) : env_(env), cache_name_(cache_name), eviction_cb_(new EvictionCallback()), cache_(NewCache(max_open_files, cache_name)), running_(1) { if (entity) { unique_ptr<FileCacheMetrics> metrics(new FileCacheMetrics(entity)); cache_->SetMetrics(std::move(metrics), Cache::ExistingMetricsPolicy::kKeep); } LOG(INFO) << Substitute("Constructed file cache $0 with capacity $1", cache_name, max_open_files); } FileCache::~FileCache() { running_.CountDown(); if (descriptor_expiry_thread_) { descriptor_expiry_thread_->Join(); } } Status FileCache::Init() { return Thread::Create("cache", Substitute("$0-evict", cache_name_), [this]() { this->RunDescriptorExpiry(); }, &descriptor_expiry_thread_); } template <> Status FileCache::DoOpenFile(const string& file_name, shared_ptr<internal::Descriptor<RWFile>>* file, bool* created_desc) { shared_ptr<internal::Descriptor<RWFile>> d; bool cd; { std::lock_guard<simple_spinlock> l(lock_); d = FindDescriptorUnlocked(file_name, FindMode::CREATE_IF_NOT_EXIST, &rwf_descs_, &cd); DCHECK(d); #ifndef NDEBUG // Enforce the invariant that a particular file name may only be used by one // descriptor at a time. This is expensive so it's only done in DEBUG mode. bool ignored; CHECK(!FindDescriptorUnlocked(file_name, FindMode::DONT_CREATE, &raf_descs_, &ignored)); #endif } if (d->base_.deleted()) { return Status::NotFound(kAlreadyDeleted, file_name); } *file = std::move(d); *created_desc = cd; return Status::OK(); } template <> Status FileCache::DoOpenFile(const string& file_name, shared_ptr<internal::Descriptor<RandomAccessFile>>* file, bool* created_desc) { shared_ptr<internal::Descriptor<RandomAccessFile>> d; bool cd; { std::lock_guard<simple_spinlock> l(lock_); d = FindDescriptorUnlocked(file_name, FindMode::CREATE_IF_NOT_EXIST, &raf_descs_, &cd); DCHECK(d); #ifndef NDEBUG // Enforce the invariant that a particular file name may only be used by one // descriptor at a time. This is expensive so it's only done in DEBUG mode. bool ignored; CHECK(!FindDescriptorUnlocked(file_name, FindMode::DONT_CREATE, &rwf_descs_, &ignored)); #endif } if (d->base_.deleted()) { return Status::NotFound(kAlreadyDeleted, file_name); } *file = std::move(d); *created_desc = cd; return Status::OK(); } template <> Status FileCache::OpenFile<Env::CREATE_OR_OPEN>(const string& file_name, shared_ptr<RWFile>* file) { shared_ptr<internal::Descriptor<RWFile>> d; bool ignored; RETURN_NOT_OK(DoOpenFile(file_name, &d, &ignored)); // Check that the underlying file can be opened (no-op for found descriptors). RETURN_NOT_OK(d->Init<Env::CREATE_OR_OPEN>()); *file = std::move(d); return Status::OK(); } template <> Status FileCache::OpenFile<Env::MUST_CREATE>(const string& file_name, shared_ptr<RWFile>* file) { shared_ptr<internal::Descriptor<RWFile>> d; bool created_desc; RETURN_NOT_OK(DoOpenFile(file_name, &d, &created_desc)); if (!created_desc) { return Status::AlreadyPresent("file already exists", file_name); } // Check that the underlying file can be opened (no-op for found descriptors). RETURN_NOT_OK(d->Init<Env::MUST_CREATE>()); *file = std::move(d); return Status::OK(); } template <> Status FileCache::OpenFile<Env::MUST_EXIST>(const string& file_name, shared_ptr<RWFile>* file) { shared_ptr<internal::Descriptor<RWFile>> d; bool ignored; RETURN_NOT_OK(DoOpenFile(file_name, &d, &ignored)); // Check that the underlying file can be opened (no-op for found descriptors). RETURN_NOT_OK(d->Init<Env::MUST_EXIST>()); *file = std::move(d); return Status::OK(); } template <> Status FileCache::OpenFile<Env::MUST_EXIST>(const string& file_name, shared_ptr<RandomAccessFile>* file) { shared_ptr<internal::Descriptor<RandomAccessFile>> d; bool ignored; RETURN_NOT_OK(DoOpenFile(file_name, &d, &ignored)); // Check that the underlying file can be opened (no-op for found descriptors). RETURN_NOT_OK(d->Init()); *file = std::move(d); return Status::OK(); } Status FileCache::DeleteFile(const string& file_name) { // Mark any outstanding descriptor as deleted. Because there may only be one // descriptor per file name, we can short circuit the search if we find a // descriptor in the first map. { std::lock_guard<simple_spinlock> l(lock_); bool ignored; { auto d = FindDescriptorUnlocked(file_name, FindMode::DONT_CREATE, &rwf_descs_, &ignored); if (d) { if (d->base_.deleted()) { return Status::NotFound(kAlreadyDeleted, file_name); } d->base_.MarkDeleted(); return Status::OK(); } } { auto d = FindDescriptorUnlocked(file_name, FindMode::DONT_CREATE, &raf_descs_, &ignored); if (d) { if (d->base_.deleted()) { return Status::NotFound(kAlreadyDeleted, file_name); } d->base_.MarkDeleted(); return Status::OK(); } } } // There are no outstanding descriptors. Delete the file now. // // Make sure it's been fully evicted from the cache (perhaps it was opened // previously?) so that the filesystem can reclaim the file data instantly. cache_->Erase(file_name); return env_->DeleteFile(file_name); } void FileCache::Invalidate(const string& file_name) { // Ensure that there are invalidated descriptors in both maps for this // filename. This ensures that any concurrent opens during this method will // see the invalidation and result in a CHECK failure. // // Note: this temporarily violates the invariant that no two descriptors may // share the same file name. That's OK because the invalidation CHECK failure // occurs before the client trips on the broken invariant. shared_ptr<internal::Descriptor<RWFile>> rwf_desc; shared_ptr<internal::Descriptor<RandomAccessFile>> raf_desc; { std::lock_guard<simple_spinlock> l(lock_); bool ignored; rwf_desc = FindDescriptorUnlocked(file_name, FindMode::CREATE_IF_NOT_EXIST, &rwf_descs_, &ignored); DCHECK(rwf_desc); rwf_desc->base_.MarkInvalidated(); raf_desc = FindDescriptorUnlocked(file_name, FindMode::CREATE_IF_NOT_EXIST, &raf_descs_, &ignored); DCHECK(raf_desc); raf_desc->base_.MarkInvalidated(); } // Remove it from the cache so that if the same path is opened again, we // will re-open a new FD rather than retrieving one that might have been // cached prior to invalidation. cache_->Erase(file_name); // Remove the invalidated descriptors from the maps. We are guaranteed they // are still there because we've held a strong references to them for the // duration of this method, and no other methods erase strong references from // the maps. { std::lock_guard<simple_spinlock> l(lock_); CHECK_EQ(1, rwf_descs_.erase(file_name)); CHECK_EQ(1, raf_descs_.erase(file_name)); } } size_t FileCache::NumDescriptorsForTests() const { std::lock_guard<simple_spinlock> l(lock_); return rwf_descs_.size() + raf_descs_.size(); } string FileCache::ToDebugString() const { string ret; // We need to iterate through the descriptor maps, so make temporary copies // of them. DescriptorMap<RWFile> rwfs_copy; DescriptorMap<RandomAccessFile> rafs_copy; { std::lock_guard<simple_spinlock> l(lock_); rwfs_copy = rwf_descs_; rafs_copy = raf_descs_; } // Dump the contents of the copies. ret += MapToDebugString(rwfs_copy, "rwf"); ret += MapToDebugString(rafs_copy, "raf"); return ret; } template <class FileType> string FileCache::MapToDebugString(const DescriptorMap<FileType>& descs, const string& prefix) { string ret; for (const auto& e : descs) { bool strong = false; bool deleted = false; bool opened = false; shared_ptr<internal::Descriptor<FileType>> d = e.second.lock(); if (d) { strong = true; if (d->base_.deleted()) { deleted = true; } internal::ScopedOpenedDescriptor<FileType> sod(d->base_.LookupFromCache()); if (sod.opened()) { opened = true; } } if (strong) { ret += Substitute("$0: $1 (S$2$3)\n", prefix, e.first, deleted ? "D" : "", opened ? "O" : ""); } else { ret += Substitute("$0: $1\n", prefix, e.first); } } return ret; } template <class FileType> shared_ptr<internal::Descriptor<FileType>> FileCache::FindDescriptorUnlocked( const string& file_name, FindMode mode, DescriptorMap<FileType>* descs, bool* created_desc) { DCHECK(lock_.is_locked()); shared_ptr<internal::Descriptor<FileType>> d; auto it = descs->find(file_name); if (it != descs->end()) { // Found the descriptor. Has it expired? d = it->second.lock(); if (d) { CHECK(!d->base_.invalidated()); // Descriptor is still valid, return it. VLOG(2) << "Found existing descriptor: " << file_name; *created_desc = false; return d; } // Descriptor has expired; erase it and pretend we found nothing. descs->erase(it); } if (mode == FindMode::CREATE_IF_NOT_EXIST) { d = std::make_shared<internal::Descriptor<FileType>>(this, file_name); EmplaceOrDie(descs, file_name, d); VLOG(2) << "Created new descriptor: " << file_name; *created_desc = true; } else { *created_desc = false; } return d; } template <class FileType> void FileCache::ExpireDescriptorsFromMap(DescriptorMap<FileType>* descs) { for (auto it = descs->begin(); it != descs->end();) { if (it->second.expired()) { it = descs->erase(it); } else { it++; } } } void FileCache::RunDescriptorExpiry() { while (!running_.WaitFor(MonoDelta::FromMilliseconds( FLAGS_file_cache_expiry_period_ms))) { std::lock_guard<simple_spinlock> l(lock_); ExpireDescriptorsFromMap(&rwf_descs_); ExpireDescriptorsFromMap(&raf_descs_); } } } // namespace kudu