libminifi/include/sitetosite/Peer.h (251 lines of code) (raw):

/** * * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ #pragma once #include <errno.h> #include <stdio.h> #include <atomic> #include <map> #include <memory> #include <mutex> #include <string> #include <utility> #include "core/logging/LoggerFactory.h" #include "core/Property.h" #include "io/BaseStream.h" #include "io/BufferStream.h" #include "properties/Configure.h" #include "http/BaseHTTPClient.h" #include "utils/TimeUtil.h" #include "io/NetworkPrioritizer.h" namespace org::apache::nifi::minifi::sitetosite { class Peer { public: explicit Peer(const utils::Identifier &port_id, const std::string &host, uint16_t port, bool secure = false) : host_(host), port_(port), secure_(secure) { port_id_ = port_id; } explicit Peer(const std::string &host, uint16_t port, bool secure = false) : host_(host), port_(port), secure_(secure) { } explicit Peer(const Peer &other) : host_(other.host_), port_(other.port_), secure_(other.secure_) { port_id_ = other.port_id_; } explicit Peer(Peer &&other) : host_(std::move(other.host_)), port_(std::move(other.port_)), secure_(std::move(other.secure_)) { port_id_ = other.port_id_; } uint16_t getPort() const { return port_; } const std::string &getHost() const { return host_; } bool isSecure() const { return secure_; } utils::Identifier getPortId() const { return port_id_; } protected: std::string host_; uint16_t port_; utils::Identifier port_id_; // secore comms bool secure_; }; class PeerStatus { public: PeerStatus(const std::shared_ptr<Peer> &peer, uint32_t flow_file_count, bool query_for_peers) : peer_(peer), flow_file_count_(flow_file_count), query_for_peers_(query_for_peers) { } PeerStatus(const PeerStatus &other) = default; PeerStatus(PeerStatus &&other) = default; PeerStatus& operator=(const PeerStatus &other) = default; PeerStatus& operator=(PeerStatus &&other) = default; const std::shared_ptr<Peer> &getPeer() const { return peer_; } uint32_t getFlowFileCount() { return flow_file_count_; } bool getQueryForPeers() { return query_for_peers_; } protected: std::shared_ptr<Peer> peer_; uint32_t flow_file_count_; bool query_for_peers_; }; static const char MAGIC_BYTES[] = { 'N', 'i', 'F', 'i' }; // Site2SitePeer Class class SiteToSitePeer : public org::apache::nifi::minifi::io::BaseStreamImpl { public: SiteToSitePeer() : stream_(nullptr), host_(""), port_(-1) { } /* * Create a new site2site peer */ explicit SiteToSitePeer(std::unique_ptr<org::apache::nifi::minifi::io::BaseStream> injected_socket, const std::string host, uint16_t port, const std::string &ifc) : SiteToSitePeer(host, port, ifc) { stream_ = std::move(injected_socket); } explicit SiteToSitePeer(const std::string &host, uint16_t port, const std::string &ifc) : stream_(nullptr), host_(host), port_(port), timeout_(std::chrono::seconds(30)), logger_(core::logging::LoggerFactory<SiteToSitePeer>::getLogger()) { url_ = "nifi://" + host_ + ":" + std::to_string(port_); yield_expiration_ = std::chrono::system_clock::time_point(); timeout_ = std::chrono::seconds(30); local_network_interface_ = io::NetworkInterface(ifc, nullptr); } explicit SiteToSitePeer(SiteToSitePeer &&ss) : stream_(ss.stream_.release()), host_(std::move(ss.host_)), port_(std::move(ss.port_)), local_network_interface_(std::move(ss.local_network_interface_)), proxy_(std::move(ss.proxy_)), logger_(std::move(ss.logger_)) { yield_expiration_.store(ss.yield_expiration_); timeout_.store(ss.timeout_); url_ = std::move(ss.url_); } // Destructor ~SiteToSitePeer() { Close(); } // Set Processor yield period in MilliSecond void setYieldPeriodMsec(std::chrono::milliseconds period) { yield_period_msec_ = period; } // get URL std::string getURL() { return url_; } // setInterface void setInterface(std::string &ifc) { local_network_interface_ = io::NetworkInterface(ifc, nullptr); } std::string getInterface() { return local_network_interface_.getInterface(); } // Get Processor yield period in MilliSecond std::chrono::milliseconds getYieldPeriodMsec(void) { return (yield_period_msec_); } // Yield based on the yield period void yield() { yield_expiration_ = std::chrono::system_clock::now() + yield_period_msec_.load(); } // setHostName void setHostName(std::string host_) { this->host_ = host_; url_ = "nifi://" + host_ + ":" + std::to_string(port_); } // setPort void setPort(uint16_t port_) { this->port_ = port_; url_ = "nifi://" + host_ + ":" + std::to_string(port_); } // getHostName std::string getHostName() { return host_; } // getPort uint16_t getPort() { return port_; } // Yield based on the input time void yield(std::chrono::milliseconds time) { yield_expiration_ = (std::chrono::system_clock::now() + time); } // whether need be to yield bool isYield() { return yield_expiration_.load() >= std::chrono::system_clock::now(); } // clear yield expiration void clearYield() { yield_expiration_ = std::chrono::system_clock::time_point(); } // Yield based on the yield period void yield(std::string portId) { std::lock_guard<std::mutex> lock(mutex_); yield_expiration_PortIdMap[portId] = std::chrono::system_clock::now() + yield_period_msec_.load(); } // Yield based on the input time void yield(std::string portId, std::chrono::milliseconds time) { yield_expiration_PortIdMap[portId] = std::chrono::system_clock::now() + time; } // whether need be to yield bool isYield(std::string portId) { std::lock_guard<std::mutex> lock(mutex_); auto it = this->yield_expiration_PortIdMap.find(portId); if (it != yield_expiration_PortIdMap.end()) { auto yieldExpiration = it->second; return (yieldExpiration >= std::chrono::system_clock::now()); } else { return false; } } // clear yield expiration void clearYield(std::string portId) { std::lock_guard<std::mutex> lock(mutex_); auto it = this->yield_expiration_PortIdMap.find(portId); if (it != yield_expiration_PortIdMap.end()) { yield_expiration_PortIdMap.erase(portId); } } // setTimeout void setTimeout(std::chrono::milliseconds time) { timeout_ = time; } // getTimeout std::chrono::milliseconds getTimeout() { return timeout_; } void setHTTPProxy(const http::HTTPProxy &proxy) { this->proxy_ = proxy; } http::HTTPProxy getHTTPProxy() { return this->proxy_; } void setStream(std::unique_ptr<org::apache::nifi::minifi::io::BaseStream> stream) { stream_ = nullptr; if (stream) stream_ = std::move(stream); } org::apache::nifi::minifi::io::BaseStream *getStream() { return stream_.get(); } using BaseStream::write; using BaseStream::read; size_t write(const uint8_t* data, size_t len) override { return stream_->write(data, len); } size_t read(std::span<std::byte> data) override { return stream_->read(data); } // open connection to the peer bool Open(); // close connection to the peer void Close(); /** * Move assignment operator. */ SiteToSitePeer& operator=(SiteToSitePeer&& other) { if (this == &other) { return *this; } stream_ = std::move(other.stream_); host_ = std::move(other.host_); port_ = std::move(other.port_); local_network_interface_ = std::move(other.local_network_interface_); yield_expiration_ = std::chrono::system_clock::time_point(); timeout_ = std::chrono::seconds(30); url_ = "nifi://" + host_ + ":" + std::to_string(port_); return *this; } SiteToSitePeer(const SiteToSitePeer &parent) = delete; SiteToSitePeer &operator=(const SiteToSitePeer &parent) = delete; private: std::unique_ptr<org::apache::nifi::minifi::io::BaseStream> stream_; std::string host_; uint16_t port_; io::NetworkInterface local_network_interface_; http::HTTPProxy proxy_; // Mutex for protection std::mutex mutex_; // URL std::string url_; // socket timeout; std::atomic<std::chrono::milliseconds> timeout_{}; // Yield Period in Milliseconds std::atomic<std::chrono::milliseconds> yield_period_msec_; // Yield Expiration std::atomic<std::chrono::time_point<std::chrono::system_clock>> yield_expiration_{}; // Yield Expiration per destination PortID std::map<std::string, std::chrono::time_point<std::chrono::system_clock>> yield_expiration_PortIdMap; // Logger std::shared_ptr<core::logging::Logger> logger_ = core::logging::LoggerFactory<SiteToSitePeer>::getLogger(); }; } // namespace org::apache::nifi::minifi::sitetosite