libminifi/include/c2/C2Agent.h (104 lines of code) (raw):

/** * * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ #pragma once #include <functional> #include <future> #include <map> #include <memory> #include <mutex> #include <optional> #include <string> #include <thread> #include <utility> #include <vector> #include "../core/state/nodes/MetricsBase.h" #include "../core/state/Value.h" #include "core/state/UpdateController.h" #include "controllers/UpdatePolicyControllerService.h" #include "C2Payload.h" #include "C2Trigger.h" #include "C2Protocol.h" #include "io/validation.h" #include "HeartbeatReporter.h" #include "utils/Id.h" #include "utils/MinifiConcurrentQueue.h" #include "utils/ThreadPool.h" #include "utils/file/FileSystem.h" #include "C2Utils.h" namespace org::apache::nifi::minifi::c2 { /** * Purpose and Justification: C2 agent will be the mechanism that will abstract the protocol to do the work. * * The protocol represents a transformation layer into the objects seen in C2Payload. That transformation may * be minimal or extreme, depending on the protocol itself. * * Metrics Classes defined here: * * 0 HeartBeat -- RESERVED * 1-255 Defined by the configuration file. */ class C2Agent : public state::UpdateController { public: C2Agent(std::shared_ptr<Configure> configuration, std::weak_ptr<state::response::NodeReporter> node_reporter, std::shared_ptr<utils::file::FileSystem> filesystem, std::function<void()> request_restart); ~C2Agent() noexcept override { delete protocol_.load(); } void initialize(core::controller::ControllerServiceProvider *controller, state::Pausable *pause_handler, state::StateMonitor* update_sink); void start() override; void stop() override; /** * Sends the heartbeat to ths server. Will include metrics * in the payload if they exist. */ void performHeartBeat(); std::optional<std::string> fetchFlow(const std::string& uri) const; /** * Serializes metrics into a payload. * @parem parent_paylaod parent payload into which we insert the newly generated payload. * @param name name of this metric * @param metrics metrics to include. */ static void serializeMetrics(C2Payload &metric_payload, const std::string &name, const std::vector<state::response::SerializedResponseNode> &metrics, bool is_container = false, bool is_collapsible = true); protected: /** * Check the collection of triggers for any updates that need to be handled. * This is an optional step */ void checkTriggers(); void configure(const std::shared_ptr<Configure> &configure, bool reconfigure = true); /** * Extract the payload * @param resp payload to be moved into the function. */ void extractPayload(const C2Payload &resp); /** * Enqueues a C2 server response for us to evaluate and parse. */ void enqueue_c2_server_response(C2Payload &&resp); /** * Enqueues a c2 payload for a response to the C2 server. */ void enqueue_c2_response(C2Payload &&resp) { requests.enqueue(std::move(resp)); } /** * Handles a C2 event requested by the server. * @param resp c2 server response. */ virtual void handle_c2_server_response(const C2ContentResponse &resp); void handle_clear(const C2ContentResponse &resp); /** * Handles an update request * @param C2ContentResponse response */ void handle_update(const C2ContentResponse &resp); /** * Handles a description request */ void handle_describe(const C2ContentResponse &resp); enum class UpdateResult { NO_UPDATE, UPDATE_SUCCESSFUL, UPDATE_FAILED }; /** * Updates a property */ UpdateResult update_property(const std::string &property_name, const std::string &property_value, PropertyChangeLifetime lifetime); void handle_transfer(const C2ContentResponse &resp); C2Payload bundleDebugInfo(std::map<std::string, std::unique_ptr<io::InputStream>>& files); /** * Creates configuration options C2 payload for response */ C2Payload prepareConfigurationOptions(const C2ContentResponse &resp) const; private: utils::TaskRescheduleInfo produce(); utils::TaskRescheduleInfo consume(); bool handleConfigurationUpdate(const C2ContentResponse &resp); void handlePropertyUpdate(const C2ContentResponse &resp); void handleAssetUpdate(const C2ContentResponse &resp); std::optional<std::string> resolveFlowUrl(const std::string& url) const; std::optional<std::string> resolveUrl(const std::string& url) const; static std::optional<std::string> getFlowIdFromConfigUpdate(const C2ContentResponse &resp); protected: std::timed_mutex metrics_mutex_; std::map<std::string, std::shared_ptr<state::response::ResponseNode>> metrics_map_; /** * Device information stored in the metrics format */ std::map<std::string, std::shared_ptr<state::response::ResponseNode>> root_response_nodes_; /** * Device information stored in the metrics format */ std::map<std::string, std::shared_ptr<state::response::ResponseNode>> device_information_; // responses for the the C2 agent. utils::ConcurrentQueue<C2Payload> responses; // requests that originate from the C2 server. utils::ConcurrentQueue<C2Payload> requests; // heart beat period. std::chrono::milliseconds heart_beat_period_; // maximum number of queued messages to send to the c2 server size_t max_c2_responses; // time point the last time we performed a heartbeat. std::chrono::steady_clock::time_point last_run_; // function that performs the heartbeat std::function<utils::TaskRescheduleInfo()> c2_producer_; // reference to the update sink, against which we will execute updates. state::StateMonitor* update_sink_; // functions that will be used for the udpate controller. std::vector<std::function<utils::TaskRescheduleInfo()>> functions_; std::shared_ptr<controllers::UpdatePolicyControllerService> update_service_; // controller service provider reference. core::controller::ControllerServiceProvider* controller_; state::Pausable* pause_handler_; // shared pointer to the configuration of this agent std::shared_ptr<Configure> configuration_; std::shared_ptr<utils::file::FileSystem> filesystem_; std::mutex heartbeat_mutex; std::vector<std::unique_ptr<HeartbeatReporter>> heartbeat_protocols_; std::vector<std::unique_ptr<C2Trigger>> triggers_; std::atomic<C2Protocol*> protocol_{}; std::shared_ptr<core::logging::Logger> logger_ = core::logging::LoggerFactory<C2Agent>::getLogger(); utils::ThreadPool<utils::TaskRescheduleInfo> thread_pool_; std::vector<utils::Identifier> task_ids_; bool manifest_sent_{false}; const uint64_t C2RESPONSE_POLL_MS = 100; std::weak_ptr<state::response::NodeReporter> node_reporter_; std::atomic<bool> restart_needed_ = false; std::function<void()> request_restart_; }; } // namespace org::apache::nifi::minifi::c2