libminifi/include/core/repository/LegacyVolatileContentRepository.h (65 lines of code) (raw):

/** * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ #pragma once #include <map> #include <memory> #include <string> #include <string_view> #include "AtomicRepoEntries.h" #include "io/AtomicEntryStream.h" #include "core/ContentRepository.h" #include "properties/Configure.h" #include "core/Connectable.h" #include "core/logging/LoggerFactory.h" #include "utils/GeneralUtils.h" #include "VolatileRepositoryData.h" #include "utils/Literals.h" namespace org::apache::nifi::minifi::core::repository { /** * Purpose: Stages content into a volatile area of memory. Note that when the maximum number * of entries is consumed we will rollback a session to wait for others to be freed. */ class LegacyVolatileContentRepository : public core::ContentRepositoryImpl { public: static const char *minimal_locking; explicit LegacyVolatileContentRepository(std::string_view name = className<LegacyVolatileContentRepository>()) : core::ContentRepositoryImpl(name), repo_data_(15000, static_cast<size_t>(10_MiB * 0.75)), minimize_locking_(true), logger_(logging::LoggerFactory<LegacyVolatileContentRepository>::getLogger()) { } ~LegacyVolatileContentRepository() override { logger_->log_debug("Clearing repository"); if (!minimize_locking_) { std::lock_guard<std::mutex> lock(map_mutex_); for (const auto &item : master_list_) { delete item.second; } master_list_.clear(); } } uint64_t getRepositorySize() const override { return repo_data_.getRepositorySize(); } uint64_t getMaxRepositorySize() const override { return repo_data_.getMaxRepositorySize(); } uint64_t getRepositoryEntryCount() const override { return master_list_.size(); } bool isFull() const override { return repo_data_.isFull(); } bool initialize(const std::shared_ptr<Configure> &configure) override; std::shared_ptr<io::BaseStream> write(const minifi::ResourceClaim &claim, bool append) override; std::shared_ptr<io::BaseStream> read(const minifi::ResourceClaim &claim) override; bool exists(const minifi::ResourceClaim &claim) override; bool close(const minifi::ResourceClaim &claim) override { return remove(claim); } void clearOrphans() override { // there are no persisted orphans to delete } protected: bool removeKey(const std::string& content_path) override; private: VolatileRepositoryData repo_data_; bool minimize_locking_; // mutex and master list that represent a cache of Atomic entries. this exists so that we don't have to walk the atomic entry list. // The idea is to reduce the computational complexity while keeping access as maximally lock free as we can. std::mutex map_mutex_; std::map<ResourceClaim::Path, AtomicEntry<ResourceClaim::Path>*> master_list_; std::shared_ptr<logging::Logger> logger_; }; } // namespace org::apache::nifi::minifi::core::repository