libminifi/include/utils/ResourceQueue.h (105 lines of code) (raw):

/** * * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ #pragma once #include <cstdint> #include <memory> #include <optional> #include <string> #include <utility> #include <functional> #include <mutex> #include <condition_variable> #include "core/logging/Logger.h" #include "concurrentqueue.h" #include "MinifiConcurrentQueue.h" namespace org::apache::nifi::minifi::utils { /* * utils::ResourceQueue a threadsafe resource pool that lends out existing resources or creates them if necessary. * getResource will return an existing unused resource or use the create_resource function to create one. * If the number of existing resources reached the maximum_number_of_creatable_resources_, the call will block until a resource is returned to the queue. * The lent out resource is in a ResourceWrapper that returns the resource to the queue on destruction. * */ template<class ResourceType> class ResourceQueue : public std::enable_shared_from_this<ResourceQueue<ResourceType>> { public: class ResourceWrapper { public: ResourceWrapper(std::weak_ptr<ResourceQueue<ResourceType>> queue, std::unique_ptr<ResourceType> resource) : queue_(std::move(queue)), resource_(std::move(resource)) {} ResourceWrapper(ResourceWrapper&& src) = default; ResourceWrapper(const ResourceWrapper&) = delete; ~ResourceWrapper() { if (auto queue = queue_.lock()) queue->returnResource(std::move(resource_)); } ResourceWrapper& operator=(ResourceWrapper&&) = default; ResourceWrapper& operator=(const ResourceWrapper&) = delete; ResourceType& operator*() const { return *resource_; } ResourceType* operator->() const noexcept { return resource_.operator->(); } ResourceType* get() const { return resource_.get(); } private: std::weak_ptr<ResourceQueue<ResourceType>> queue_; std::unique_ptr<ResourceType> resource_; }; static auto create(std::function<std::unique_ptr<ResourceType>()> creator, std::optional<size_t> maximum_number_of_creatable_resources = std::nullopt, std::optional<std::function<void(ResourceType&)>> reset_fn = std::nullopt, std::shared_ptr<core::logging::Logger> logger = nullptr); [[nodiscard]] ResourceWrapper getResource() { std::unique_ptr<ResourceType> resource; // Use an existing resource, if one is available if (internal_queue_.tryDequeue(resource)) { logDebug("Using available [%p] resource instance", resource.get()); return ResourceWrapper(this->weak_from_this(), std::move(resource)); } else { const std::lock_guard<std::mutex> lock(counter_mutex_); if (!maximum_number_of_creatable_resources_ || resources_created_ < maximum_number_of_creatable_resources_) { ++resources_created_; resource = create_new_resource_(); logDebug("Created new [%p] resource instance. Number of instances: %d%s.", resource.get(), resources_created_, maximum_number_of_creatable_resources_ ? " / " + std::to_string(*maximum_number_of_creatable_resources_) : ""); return ResourceWrapper(this->weak_from_this(), std::move(resource)); } } logDebug("Waiting for resource"); if (!internal_queue_.dequeueWait(resource)) { throw std::runtime_error("No resource available"); } return ResourceWrapper(this->weak_from_this(), std::move(resource)); } protected: ResourceQueue(std::function<std::unique_ptr<ResourceType>()> create_new_resource, std::optional<size_t> maximum_number_of_creatable_resources, std::optional<std::function<void(ResourceType&)>> reset_fn, std::shared_ptr<core::logging::Logger> logger) : create_new_resource_(std::move(create_new_resource)), maximum_number_of_creatable_resources_(maximum_number_of_creatable_resources), reset_fn_(std::move(reset_fn)), logger_(std::move(logger)) { } private: void returnResource(std::unique_ptr<ResourceType> resource) { logDebug("Returning [%p] resource", resource.get()); if (reset_fn_) reset_fn_.value()(*resource); internal_queue_.enqueue(std::move(resource)); } template<typename ...Args> void logDebug(const char * const format, Args&& ...args) { if (logger_) logger_->log_debug(format, std::forward<Args>(args)...); } const std::function<std::unique_ptr<ResourceType>()> create_new_resource_; const std::optional<size_t> maximum_number_of_creatable_resources_; const std::optional<std::function<void(ResourceType&)>> reset_fn_; const std::shared_ptr<core::logging::Logger> logger_; ConditionConcurrentQueue<std::unique_ptr<ResourceType>> internal_queue_; size_t resources_created_ = 0; std::mutex counter_mutex_; struct make_shared_enabler; }; template<class ResourceType> struct ResourceQueue<ResourceType>::make_shared_enabler : public ResourceQueue<ResourceType> { template<typename... Args> make_shared_enabler(Args&& ... args) : ResourceQueue<ResourceType>(std::forward<Args>(args)...) {} }; template<class ResourceType> auto ResourceQueue<ResourceType>::create(std::function<std::unique_ptr<ResourceType>()> creator, std::optional<size_t> maximum_number_of_creatable_resources, std::optional<std::function<void(ResourceType&)>> reset_fn, std::shared_ptr<core::logging::Logger> logger) { return std::make_shared<make_shared_enabler>(std::move(creator), maximum_number_of_creatable_resources, std::move(reset_fn), std::move(logger)); } } // namespace org::apache::nifi::minifi::utils