libminifi/include/RemoteProcessorGroupPort.h (176 lines of code) (raw):

/** * @file RemoteProcessorGroupPort.h * RemoteProcessorGroupPort class declaration * * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ #pragma once #include <string> #include <utility> #include <vector> #include <mutex> #include <memory> #include <stack> #include "http/BaseHTTPClient.h" #include "concurrentqueue.h" #include "FlowFileRecord.h" #include "core/Processor.h" #include "core/ProcessSession.h" #include "core/PropertyDefinition.h" #include "core/PropertyDefinitionBuilder.h" #include "core/RelationshipDefinition.h" #include "sitetosite/SiteToSiteClient.h" #include "minifi-cpp/controllers/SSLContextService.h" #include "core/logging/LoggerFactory.h" #include "utils/Export.h" #include "core/ClassLoader.h" namespace org::apache::nifi::minifi { /** * Count down latch implementation that's used across * all threads of the RPG. This is okay since the latch increments * and decrements based on its construction. Using RAII we should * never have the concern of thread safety. */ class RPGLatch { public: RPGLatch(bool increment = true) { // NOLINT static std::atomic<int> latch_count(0); count = &latch_count; if (increment) count++; } ~RPGLatch() { count--; } int getCount() { return *count; } private: std::atomic<int> *count; }; struct RPG { std::string host_; int port_; std::string protocol_; }; class RemoteProcessorGroupPort : public core::ProcessorImpl { public: RemoteProcessorGroupPort(std::string_view name, std::string url, std::shared_ptr<Configure> configure, const utils::Identifier &uuid = {}) : core::ProcessorImpl(name, uuid), configure_(std::move(configure)), direction_(sitetosite::SEND), transmitting_(false), timeout_(0), bypass_rest_api_(false), ssl_service(nullptr), logger_(core::logging::LoggerFactory<RemoteProcessorGroupPort>::getLogger(uuid)) { client_type_ = sitetosite::CLIENT_TYPE::RAW; protocol_uuid_ = uuid; site2site_secure_ = false; peer_index_ = -1; // REST API port and host setURL(std::move(url)); } virtual ~RemoteProcessorGroupPort() = default; MINIFIAPI static constexpr auto hostName = core::PropertyDefinitionBuilder<>::createProperty("Host Name") .withDescription("Remote Host Name.") .build(); MINIFIAPI static constexpr auto SSLContext = core::PropertyDefinitionBuilder<>::createProperty("SSL Context Service") .withDescription("The SSL Context Service used to provide client certificate information for TLS/SSL (https) connections.") .build(); MINIFIAPI static constexpr auto port = core::PropertyDefinitionBuilder<>::createProperty("Port") .withDescription("Remote Port") .build(); MINIFIAPI static constexpr auto portUUID = core::PropertyDefinitionBuilder<>::createProperty("Port UUID") .withDescription("Specifies remote NiFi Port UUID.") .build(); MINIFIAPI static constexpr auto idleTimeout = core::PropertyDefinitionBuilder<>::createProperty("Idle Timeout") .withDescription("Max idle time for remote service") .isRequired(true) .withValidator(core::StandardPropertyValidators::TIME_PERIOD_VALIDATOR) .withDefaultValue("15 s") .build(); MINIFIAPI static constexpr auto Properties = std::to_array<core::PropertyReference>({ hostName, SSLContext, port, portUUID, idleTimeout }); MINIFIAPI static constexpr auto relation = core::RelationshipDefinition{"", ""}; MINIFIAPI static constexpr auto Relationships = std::array{relation}; MINIFIAPI static constexpr bool SupportsDynamicProperties = false; MINIFIAPI static constexpr bool SupportsDynamicRelationships = false; MINIFIAPI static constexpr core::annotation::Input InputRequirement = core::annotation::Input::INPUT_ALLOWED; MINIFIAPI static constexpr bool IsSingleThreaded = false; ADD_COMMON_VIRTUAL_FUNCTIONS_FOR_PROCESSORS void onSchedule(core::ProcessContext& context, core::ProcessSessionFactory& session_factory) override; void onTrigger(core::ProcessContext& context, core::ProcessSession& session) override; void initialize() override; void setDirection(sitetosite::TransferDirection direction) { direction_ = direction; if (direction_ == sitetosite::RECEIVE) this->setTriggerWhenEmpty(true); } void setTimeout(uint64_t timeout) { timeout_ = timeout; } void setTransmitting(bool val) { transmitting_ = val; } void setInterface(const std::string &ifc) { local_network_interface_ = ifc; } std::string getInterface() { return local_network_interface_; } /** * Sets the url. Supports a CSV */ void setURL(std::string val) { auto urls = utils::string::split(val, ","); for (const auto& url : urls) { http::URL parsed_url{utils::string::trim(url)}; if (parsed_url.isValid()) { logger_->log_debug("Parsed RPG URL '{}' -> '{}'", url, parsed_url.hostPort()); nifi_instances_.push_back({parsed_url.host(), parsed_url.port(), parsed_url.protocol()}); } else { logger_->log_error("Could not parse RPG URL '{}'", url); } } } std::vector<RPG> getInstances() const { return nifi_instances_; } void setHTTPProxy(const http::HTTPProxy &proxy) { this->proxy_ = proxy; } http::HTTPProxy getHTTPProxy() { return this->proxy_; } // refresh remoteSite2SiteInfo via nifi rest api std::pair<std::string, int> refreshRemoteSite2SiteInfo(); // refresh site2site peer list void refreshPeerList(); void notifyStop() override; void enableHTTP() { client_type_ = sitetosite::HTTP; } protected: /** * Non static in case anything is loaded when this object is re-scheduled */ bool is_http_disabled() { auto ptr = core::ClassLoader::getDefaultClassLoader().instantiateRaw("HTTPClient", "HTTPClient"); if (ptr != nullptr) { delete ptr; return false; } else { return true; } } std::unique_ptr<sitetosite::SiteToSiteClient> getNextProtocol(bool create); void returnProtocol(std::unique_ptr<sitetosite::SiteToSiteClient> protocol); moodycamel::ConcurrentQueue<std::unique_ptr<sitetosite::SiteToSiteClient>> available_protocols_; std::shared_ptr<Configure> configure_; // Transaction Direction sitetosite::TransferDirection direction_; // Transmitting std::atomic<bool> transmitting_; // timeout uint64_t timeout_; // local network interface std::string local_network_interface_; utils::Identifier protocol_uuid_; std::chrono::milliseconds idle_timeout_ = std::chrono::seconds(15); // rest API end point info std::vector<struct RPG> nifi_instances_; // http proxy http::HTTPProxy proxy_; bool bypass_rest_api_; sitetosite::CLIENT_TYPE client_type_; // Remote Site2Site Info bool site2site_secure_; std::vector<sitetosite::PeerStatus> peers_; std::atomic<int> peer_index_; std::mutex peer_mutex_; std::string rest_user_name_; std::string rest_password_; std::shared_ptr<controllers::SSLContextService> ssl_service; private: std::shared_ptr<core::logging::Logger> logger_; static const char* RPG_SSL_CONTEXT_SERVICE_NAME; }; } // namespace org::apache::nifi::minifi