extensions/procfs/processors/ProcFsMonitor.h (109 lines of code) (raw):

/** * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ #pragma once #include <string> #include <memory> #include <map> #include <vector> #include <utility> #include <optional> #include "../ProcFs.h" #include "core/Processor.h" #include "core/PropertyDefinition.h" #include "core/PropertyDefinitionBuilder.h" #include "core/RelationshipDefinition.h" #include "core/logging/LoggerFactory.h" #include "core/logging/Logger.h" #include "utils/Enum.h" #include "rapidjson/stream.h" #include "rapidjson/document.h" namespace org::apache::nifi::minifi::extensions::procfs { enum class OutputFormat { JSON, OpenTelemetry }; enum class OutputCompactness { Compact, Pretty }; enum class ResultRelativeness { Relative, Absolute }; class ProcFsMonitor final : public core::ProcessorImpl { public: explicit ProcFsMonitor(const std::string_view name, utils::Identifier uuid = utils::Identifier()) : ProcessorImpl(name, uuid) { logger_ = core::logging::LoggerFactory<ProcFsMonitor>::getLogger(uuid_); } ~ProcFsMonitor() override = default; EXTENSIONAPI static constexpr const char* Description = "This processor can create FlowFiles with various performance data through the proc pseudo-filesystem. (Linux only)"; EXTENSIONAPI static constexpr auto OutputFormatProperty = core::PropertyDefinitionBuilder<magic_enum::enum_count<OutputFormat>()>::createProperty("Output Format") .withDescription("The output type of the new flowfile") .withAllowedValues(magic_enum::enum_names<OutputFormat>()) .withDefaultValue(magic_enum::enum_name(OutputFormat::JSON)) .isRequired(true) .build(); EXTENSIONAPI static constexpr auto OutputCompactnessProperty = core::PropertyDefinitionBuilder<magic_enum::enum_count<OutputCompactness>()>::createProperty("Output Compactness") .withDescription("The output format of the new flowfile") .withAllowedValues(magic_enum::enum_names<OutputCompactness>()) .withDefaultValue(magic_enum::enum_name(OutputCompactness::Pretty)) .isRequired(true) .build(); EXTENSIONAPI static constexpr auto DecimalPlaces = core::PropertyDefinitionBuilder<>::createProperty("Round to decimal places") .withDescription("The number of decimal places to round the values to (blank for no rounding)") .withValidator(core::StandardPropertyValidators::UNSIGNED_INTEGER_VALIDATOR) .build(); EXTENSIONAPI static constexpr auto ResultRelativenessProperty = core::PropertyDefinitionBuilder<magic_enum::enum_count<ResultRelativeness>()>::createProperty("Result Type") .withDescription("Absolute returns the current procfs values, relative calculates the usage between triggers") .withAllowedValues(magic_enum::enum_names<ResultRelativeness>()) .withDefaultValue(magic_enum::enum_name(ResultRelativeness::Absolute)) .isRequired(true) .build(); EXTENSIONAPI static constexpr auto Properties = std::to_array<core::PropertyReference>({ OutputFormatProperty, OutputCompactnessProperty, DecimalPlaces, ResultRelativenessProperty }); EXTENSIONAPI static constexpr auto Success = core::RelationshipDefinition{"success", "All files are routed to success"}; EXTENSIONAPI static constexpr auto Relationships = std::array{Success}; EXTENSIONAPI static constexpr bool SupportsDynamicProperties = false; EXTENSIONAPI static constexpr bool SupportsDynamicRelationships = false; EXTENSIONAPI static constexpr core::annotation::Input InputRequirement = core::annotation::Input::INPUT_FORBIDDEN; EXTENSIONAPI static constexpr bool IsSingleThreaded = true; ADD_COMMON_VIRTUAL_FUNCTIONS_FOR_PROCESSORS void onSchedule(core::ProcessContext& context, core::ProcessSessionFactory& session_factory) override; void onTrigger(core::ProcessContext& context, core::ProcessSession& session) override; void initialize() override; private: rapidjson::Value& prepareJSONBody(rapidjson::Document& root); void processCPUInformation(const std::vector<std::pair<std::string, CpuStatData>>& current_cpu_stats, rapidjson::Value& body, rapidjson::Document::AllocatorType& alloc); void processDiskInformation(const std::vector<std::pair<std::string, DiskStatData>>& current_disk_stats, rapidjson::Value& body, rapidjson::Document::AllocatorType& alloc); void processNetworkInformation(const std::vector<std::pair<std::string, NetDevData>>& current_net_devs, rapidjson::Value& body, rapidjson::Document::AllocatorType& alloc); void processProcessInformation(const std::map<pid_t, ProcessStat>& current_process_stats, std::optional<std::chrono::duration<double>> last_cpu_period, rapidjson::Value& body, rapidjson::Document::AllocatorType& alloc); void processMemoryInformation(rapidjson::Value& body, rapidjson::Document::AllocatorType& alloc); void refreshMembers(std::vector<std::pair<std::string, CpuStatData>>&& current_cpu_stats, std::vector<std::pair<std::string, DiskStatData>>&& current_disk_stats, std::vector<std::pair<std::string, NetDevData>>&& current_net_devs, std::map<pid_t, ProcessStat>&& current_process_stats); OutputFormat output_format_ = OutputFormat::JSON; OutputCompactness output_compactness_ = OutputCompactness::Pretty; ResultRelativeness result_relativeness_ = ResultRelativeness::Absolute; std::optional<uint8_t> decimal_places_; ProcFs proc_fs_; std::vector<std::pair<std::string, CpuStatData>> last_cpu_stats_; std::vector<std::pair<std::string, NetDevData>> last_net_devs_; std::vector<std::pair<std::string, DiskStatData>> last_disk_stats_; std::map<pid_t, ProcessStat> last_process_stats_; std::optional<std::chrono::steady_clock::time_point> last_trigger_; }; } // namespace org::apache::nifi::minifi::extensions::procfs