libminifi/include/Connection.h (137 lines of code) (raw):

/** * @file Connection.h * Connection class declaration * * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ #pragma once #include <memory> #include <set> #include <string> #include <vector> #include <map> #include <mutex> #include <atomic> #include <algorithm> #include <utility> #include "core/Core.h" #include "core/Connectable.h" #include "core/logging/Logger.h" #include "core/Relationship.h" #include "core/FlowFile.h" #include "core/Repository.h" #include "utils/FlowFileQueue.h" #include "minifi-cpp/Connection.h" namespace org::apache::nifi::minifi { namespace test::utils { struct ConnectionTestAccessor; } // namespace test::utils class ConnectionImpl : public core::ConnectableImpl, public virtual Connection { friend struct test::utils::ConnectionTestAccessor; public: explicit ConnectionImpl(std::shared_ptr<core::Repository> flow_repository, std::shared_ptr<core::ContentRepository> content_repo, std::string_view name); explicit ConnectionImpl(std::shared_ptr<core::Repository> flow_repository, std::shared_ptr<core::ContentRepository> content_repo, std::string_view name, const utils::Identifier &uuid); explicit ConnectionImpl(std::shared_ptr<core::Repository> flow_repository, std::shared_ptr<core::ContentRepository> content_repo, std::string_view name, const utils::Identifier &uuid, const utils::Identifier &srcUUID); explicit ConnectionImpl(std::shared_ptr<core::Repository> flow_repository, std::shared_ptr<core::ContentRepository> content_repo, std::string_view name, const utils::Identifier &uuid, const utils::Identifier &srcUUID, const utils::Identifier &destUUID); explicit ConnectionImpl(std::shared_ptr<core::Repository> flow_repository, std::shared_ptr<core::ContentRepository> content_repo, std::shared_ptr<SwapManager> swap_manager, std::string_view name, const utils::Identifier& uuid); ~ConnectionImpl() override = default; ConnectionImpl(const ConnectionImpl &parent) = delete; ConnectionImpl &operator=(const ConnectionImpl &parent) = delete; static constexpr uint64_t DEFAULT_BACKPRESSURE_THRESHOLD_COUNT = 2000; static constexpr uint64_t DEFAULT_BACKPRESSURE_THRESHOLD_DATA_SIZE = 100_MB; void setSourceUUID(const utils::Identifier &uuid) override { src_uuid_ = uuid; } void setDestinationUUID(const utils::Identifier &uuid) override { dest_uuid_ = uuid; } utils::Identifier getSourceUUID() const override { return src_uuid_; } utils::Identifier getDestinationUUID() const override { return dest_uuid_; } void setSource(core::Connectable* source) override { source_connectable_ = source; } core::Connectable* getSource() const override { return source_connectable_; } void setDestination(core::Connectable* dest) override { dest_connectable_ = dest; } core::Connectable* getDestination() const override { return dest_connectable_; } void addRelationship(core::Relationship relationship) override { relationships_.insert(std::move(relationship)); } const std::set<core::Relationship> &getRelationships() const override { return relationships_; } void setBackpressureThresholdCount(uint64_t size) override { backpressure_threshold_count_ = size; } uint64_t getBackpressureThresholdCount() const override { return backpressure_threshold_count_; } void setBackpressureThresholdDataSize(uint64_t size) override { backpressure_threshold_data_size_ = size; } uint64_t getBackpressureThresholdDataSize() const override { return backpressure_threshold_data_size_; } void setSwapThreshold(uint64_t size) override { queue_.setTargetSize(size); queue_.setMinSize(size / 2); queue_.setMaxSize(size * 3 / 2); } void setFlowExpirationDuration(std::chrono::milliseconds duration) override { expired_duration_ = duration; } std::chrono::milliseconds getFlowExpirationDuration() const override { return expired_duration_; } void setDropEmptyFlowFiles(bool drop) override { drop_empty_ = drop; } bool getDropEmptyFlowFiles() const override { return drop_empty_; } bool isEmpty() const override; bool backpressureThresholdReached() const override; uint64_t getQueueSize() const override { std::lock_guard<std::mutex> lock(mutex_); return queue_.size(); } uint64_t getQueueDataSize() override { return queued_data_size_; } void put(const std::shared_ptr<core::FlowFile>& flow) override; void multiPut(std::vector<std::shared_ptr<core::FlowFile>>& flows) override; std::shared_ptr<core::FlowFile> poll(std::set<std::shared_ptr<core::FlowFile>> &expiredFlowRecords) override; void drain(bool delete_permanently) override; void yield() override {} bool isWorkAvailable() override { const std::lock_guard<std::mutex> lock{mutex_}; return queue_.isWorkAvailable(); } bool isRunning() const override { return true; } protected: utils::Identifier src_uuid_; utils::Identifier dest_uuid_; std::set<core::Relationship> relationships_; core::Connectable* source_connectable_ = nullptr; core::Connectable* dest_connectable_ = nullptr; std::atomic<uint64_t> backpressure_threshold_count_ = DEFAULT_BACKPRESSURE_THRESHOLD_COUNT; std::atomic<uint64_t> backpressure_threshold_data_size_ = DEFAULT_BACKPRESSURE_THRESHOLD_DATA_SIZE; std::atomic<std::chrono::milliseconds> expired_duration_ = std::chrono::milliseconds(0); std::shared_ptr<core::Repository> flow_repository_; std::shared_ptr<core::ContentRepository> content_repo_; private: bool drop_empty_ = false; mutable std::mutex mutex_; std::atomic<uint64_t> queued_data_size_ = 0; utils::FlowFileQueue queue_; std::shared_ptr<core::logging::Logger> logger_ = core::logging::LoggerFactory<Connection>::getLogger(); }; } // namespace org::apache::nifi::minifi