extensions/procfs/processors/ProcFsMonitor.cpp (295 lines of code) (raw):
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "ProcFsMonitor.h"
#include <limits>
#include <utility>
#include <vector>
#include <unordered_map>
#include "core/Resource.h"
#include "core/ProcessContext.h"
#include "core/ProcessSession.h"
#include "../ProcFsJsonSerialization.h"
#include "utils/JsonCallback.h"
#include "utils/OpenTelemetryLogDataModelUtils.h"
#include "utils/gsl.h"
#include "utils/ProcessorConfigUtils.h"
using namespace std::literals::chrono_literals;
namespace org::apache::nifi::minifi::extensions::procfs {
void ProcFsMonitor::initialize() {
setSupportedProperties(Properties);
setSupportedRelationships(Relationships);
}
void ProcFsMonitor::onSchedule(const std::shared_ptr<core::ProcessContext>& context, const std::shared_ptr<core::ProcessSessionFactory>&) {
gsl_Expects(context);
output_format_ = utils::parseEnumProperty<OutputFormat>(*context, OutputFormatProperty);
output_compactness_ = utils::parseEnumProperty<OutputCompactness>(*context, OutputCompactnessProperty);
result_relativeness_ = utils::parseEnumProperty<ResultRelativeness>(*context, ResultRelativenessProperty);
setupDecimalPlacesFromProperties(*context);
}
namespace {
size_t number_of_cores(const std::vector<std::pair<std::string, CpuStatData>>& cpu_stat) {
gsl_Expects(cpu_stat.size() > 1);
return cpu_stat.size() - 1;
}
bool cpu_stats_are_valid(const std::vector<std::pair<std::string, CpuStatData>>& cpu_stat) {
return cpu_stat.size() > 1 && cpu_stat[0].first == "cpu"; // needs the aggregate and at least one core information to be valid
}
std::optional<std::chrono::duration<double>> getAggregateCpuDiff(std::vector<std::pair<std::string, CpuStatData>>& current_cpu_stats,
std::vector<std::pair<std::string, CpuStatData>>& last_cpu_stats) {
if (current_cpu_stats.size() != last_cpu_stats.size())
return std::nullopt;
if (!cpu_stats_are_valid(current_cpu_stats) || !cpu_stats_are_valid(last_cpu_stats))
return std::nullopt;
return (current_cpu_stats[0].second.getTotal() - last_cpu_stats[0].second.getTotal()) / number_of_cores(current_cpu_stats);
}
} // namespace
void ProcFsMonitor::onTrigger(core::ProcessContext*, core::ProcessSession* session) {
gsl_Expects(session);
std::shared_ptr<core::FlowFile> flowFile = session->create();
rapidjson::Document root = rapidjson::Document(rapidjson::kObjectType);
rapidjson::Value& body = prepareJSONBody(root);
auto current_cpu_stats = proc_fs_.getCpuStats();
auto current_disk_stats = proc_fs_.getDiskStats();
auto current_net_devs = proc_fs_.getNetDevs();
auto current_process_stats = proc_fs_.getProcessStats();
auto last_cpu_period = getAggregateCpuDiff(current_cpu_stats, last_cpu_stats_);
auto refresh_members = gsl::finally([&] {
refreshMembers(std::move(current_cpu_stats),
std::move(current_disk_stats),
std::move(current_net_devs),
std::move(current_process_stats));
});
processCPUInformation(current_cpu_stats, body, root.GetAllocator());
processDiskInformation(current_disk_stats, body, root.GetAllocator());
processNetworkInformation(current_net_devs, body, root.GetAllocator());
processProcessInformation(current_process_stats, last_cpu_period, body, root.GetAllocator());
processMemoryInformation(body, root.GetAllocator());
if (output_compactness_ == OutputCompactness::Pretty) {
utils::PrettyJsonOutputCallback callback(std::move(root), decimal_places_);
session->write(flowFile, std::ref(callback));
session->transfer(flowFile, Success);
} else if (output_compactness_ == OutputCompactness::Compact) {
utils::JsonOutputCallback callback(std::move(root), decimal_places_);
session->write(flowFile, std::ref(callback));
session->transfer(flowFile, Success);
} else {
throw Exception(GENERAL_EXCEPTION, "Invalid output compactness");
}
}
rapidjson::Value& ProcFsMonitor::prepareJSONBody(rapidjson::Document& root) {
if (output_format_ == OutputFormat::OpenTelemetry) {
utils::OpenTelemetryLogDataModel::appendEventInformation(root, "PerformanceData");
utils::OpenTelemetryLogDataModel::appendHostInformation(root);
utils::OpenTelemetryLogDataModel::appendBody(root);
return root["Body"];
} else if (output_format_ == OutputFormat::JSON) {
return root;
} else {
throw Exception(GENERAL_EXCEPTION, "Invalid output format");
}
}
void ProcFsMonitor::setupDecimalPlacesFromProperties(const core::ProcessContext& context) {
std::string decimal_places_str;
if (!context.getProperty(DecimalPlaces, decimal_places_str) || decimal_places_str.empty()) {
decimal_places_ = std::nullopt;
return;
}
try {
const auto decimal_places = std::stoul(decimal_places_str);
if (decimal_places > std::numeric_limits<uint8_t>::max() || decimal_places == 0) {
throw Exception(PROCESS_SCHEDULE_EXCEPTION, "ProcFsMonitor Decimal Places property is zero or too large");
}
decimal_places_ = gsl::narrow<uint8_t>(decimal_places);
logger_->log_trace("Rounding is enabled with %d decimal places", decimal_places_.value());
} catch (const std::exception&) {
logger_->log_error("ProcFsMonitor Decimal Places property is invalid or out of range: %s", decimal_places_str);
throw Exception(PROCESS_SCHEDULE_EXCEPTION, "ProcFsMonitor Decimal Places property is invalid or out of range");
}
}
namespace {
void processAbsoluteCPUInformation(const std::vector<std::pair<std::string, CpuStatData>>& current_cpu_stats,
rapidjson::Value& body,
rapidjson::Document::AllocatorType& alloc) {
rapidjson::Value cpu_root{rapidjson::kObjectType};
for (const auto& [cpu_name, cpu_stat] : current_cpu_stats)
addCPUStatToJson(cpu_name, cpu_stat, cpu_root, alloc);
if (!cpu_root.ObjectEmpty())
body.AddMember("CPU", cpu_root.Move(), alloc);
}
void processRelativeCPUInformation(const std::vector<std::pair<std::string, CpuStatData>>& current_cpu_stats,
const std::vector<std::pair<std::string, CpuStatData>>& last_cpu_stats,
rapidjson::Value& body,
rapidjson::Document::AllocatorType& alloc) {
if (last_cpu_stats.size() != current_cpu_stats.size())
return;
rapidjson::Value cpu_root{rapidjson::kObjectType};
for (size_t i = 0; i < current_cpu_stats.size(); ++i) {
const auto& [cpu_name, cpu_stat] = current_cpu_stats[i];
const auto& [last_cpu_name, last_cpu_stat] = last_cpu_stats[i];
gsl_Expects(last_cpu_name == cpu_name);
if (cpu_stat.getTotal() > last_cpu_stat.getTotal())
addCPUStatPeriodToJson(cpu_name, cpu_stat, last_cpu_stat, cpu_root, alloc);
}
if (!cpu_root.ObjectEmpty())
body.AddMember("CPU", cpu_root.Move(), alloc);
}
void processAbsoluteNetworkInformation(const std::vector<std::pair<std::string, NetDevData>>& current_net_devs,
rapidjson::Value& body,
rapidjson::Document::AllocatorType& alloc) {
rapidjson::Value network_root{rapidjson::kObjectType};
for (const auto& [interface_name, net_dev] : current_net_devs)
addNetDevToJson(interface_name, net_dev, network_root, alloc);
if (!network_root.ObjectEmpty())
body.AddMember("Network", network_root.Move(), alloc);
}
void processRelativeNetworkInformation(const std::vector<std::pair<std::string, NetDevData>>& current_net_devs,
const std::vector<std::pair<std::string, NetDevData>>& last_net_devs,
const std::optional<std::chrono::steady_clock::time_point> last_trigger_,
rapidjson::Value& body,
rapidjson::Document::AllocatorType& alloc) {
if (!last_trigger_)
return;
std::chrono::duration<double> duration = std::chrono::steady_clock::now() - *last_trigger_;
if (duration <= 0ms)
return;
rapidjson::Value network_root{rapidjson::kObjectType};
for (const auto& current_net_dev_it : current_net_devs) {
auto& interface_name = current_net_dev_it.first;
auto& current_net_dev = current_net_dev_it.second;
auto last_net_dev_it = std::find_if(last_net_devs.begin(), last_net_devs.end(), [&interface_name](auto& last_net_dev) { return last_net_dev.first == interface_name; });
if (last_net_dev_it == last_net_devs.end())
continue;
auto& last_net_dev = last_net_dev_it->second;
addNetDevPerSecToJson(interface_name, current_net_dev - last_net_dev, duration, network_root, alloc);
}
if (!network_root.ObjectEmpty())
body.AddMember("Network", network_root.Move(), alloc);
}
void processAbsoluteDiskInformation(const std::vector<std::pair<std::string, DiskStatData>>& current_disk_stats,
rapidjson::Value& body,
rapidjson::Document::AllocatorType& alloc) {
rapidjson::Value disk_root{rapidjson::kObjectType};
for (const auto& [disk_name, disk_stat] : current_disk_stats)
addDiskStatToJson(disk_name, disk_stat, disk_root, alloc);
if (!disk_root.ObjectEmpty())
body.AddMember("Disk", disk_root.Move(), alloc);
}
void processRelativeDiskInformation(const std::vector<std::pair<std::string, DiskStatData>>& current_disk_stats,
const std::vector<std::pair<std::string, DiskStatData>>& last_disk_stats,
const std::optional<std::chrono::steady_clock::time_point> last_trigger_,
rapidjson::Value& body,
rapidjson::Document::AllocatorType& alloc) {
if (!last_trigger_)
return;
std::chrono::duration<double> duration = std::chrono::steady_clock::now() - *last_trigger_;
if (duration <= 0ms)
return;
rapidjson::Value disk_root{rapidjson::kObjectType};
for (const auto& current_disk_stat_it : current_disk_stats) {
auto& disk_name = current_disk_stat_it.first;
auto& current_disk_stat = current_disk_stat_it.second;
auto last_disk_stat_it = std::find_if(last_disk_stats.begin(), last_disk_stats.end(), [&disk_name](auto& last_disk_stat) { return last_disk_stat.first == disk_name; });
if (last_disk_stat_it == last_disk_stats.end())
continue;
auto& last_disk_stat = last_disk_stat_it->second;
addDiskStatPerSecToJson(disk_name, current_disk_stat - last_disk_stat, duration, disk_root, alloc);
}
if (!disk_root.ObjectEmpty())
body.AddMember("Disk", disk_root.Move(), alloc);
}
void processAbsoluteProcessInformation(const std::map<pid_t, ProcessStat>& current_process_stats,
rapidjson::Value& body,
rapidjson::Document::AllocatorType& alloc) {
rapidjson::Value process_root{rapidjson::kObjectType};
for (const auto&[pid, process_stat] : current_process_stats) {
addProcessStatToJson(std::to_string(pid), process_stat, process_root, alloc);
}
if (!process_root.ObjectEmpty())
body.AddMember("Process", process_root.Move(), alloc);
}
void processRelativeProcessInformation(const std::map<pid_t, ProcessStat>& current_process_stats,
const std::map<pid_t, ProcessStat>& last_process_stats,
std::optional<std::chrono::duration<double>> last_cpu_period,
rapidjson::Value& body,
rapidjson::Document::AllocatorType& alloc) {
rapidjson::Value process_root{rapidjson::kObjectType};
if (!last_cpu_period || *last_cpu_period <= 0s)
return;
for (const auto& [pid, process_stat] : current_process_stats) {
if (last_process_stats.contains(pid) && last_process_stats.at(pid).getComm() == process_stat.getComm()) {
addNormalizedProcessStatToJson(std::to_string(pid), last_process_stats.at(pid), process_stat, *last_cpu_period, process_root, alloc);
}
}
if (!process_root.ObjectEmpty())
body.AddMember("Process", process_root.Move(), alloc);
}
} // namespace
void ProcFsMonitor::processCPUInformation(const std::vector<std::pair<std::string, CpuStatData>>& current_cpu_stats,
rapidjson::Value& body,
rapidjson::Document::AllocatorType& alloc) {
if (!cpu_stats_are_valid(current_cpu_stats))
return;
if (result_relativeness_ == ResultRelativeness::Relative)
processRelativeCPUInformation(current_cpu_stats, last_cpu_stats_, body, alloc);
else if (result_relativeness_ == ResultRelativeness::Absolute)
processAbsoluteCPUInformation(current_cpu_stats, body, alloc);
else
throw Exception(GENERAL_EXCEPTION, "Invalid result relativeness");
}
void ProcFsMonitor::processDiskInformation(const std::vector<std::pair<std::string, DiskStatData>>& current_disk_stats,
rapidjson::Value& body,
rapidjson::Document::AllocatorType& alloc) {
if (current_disk_stats.empty())
return;
if (result_relativeness_ == ResultRelativeness::Relative)
processRelativeDiskInformation(current_disk_stats, last_disk_stats_, last_trigger_, body, alloc);
else if (result_relativeness_ == ResultRelativeness::Absolute)
processAbsoluteDiskInformation(current_disk_stats, body, alloc);
else
throw Exception(GENERAL_EXCEPTION, "Invalid result relativeness");
}
void ProcFsMonitor::processNetworkInformation(const std::vector<std::pair<std::string, NetDevData>>& current_net_devs,
rapidjson::Value& body,
rapidjson::Document::AllocatorType& alloc) {
if (current_net_devs.empty())
return;
if (result_relativeness_ == ResultRelativeness::Relative)
processRelativeNetworkInformation(current_net_devs, last_net_devs_, last_trigger_, body, alloc);
else if (result_relativeness_ == ResultRelativeness::Absolute)
processAbsoluteNetworkInformation(current_net_devs, body, alloc);
else
throw Exception(GENERAL_EXCEPTION, "Invalid result relativeness");
}
void ProcFsMonitor::processProcessInformation(const std::map<pid_t, ProcessStat>& current_process_stats,
std::optional<std::chrono::duration<double>> last_cpu_period,
rapidjson::Value& body,
rapidjson::Document::AllocatorType& alloc) {
if (current_process_stats.empty())
return;
rapidjson::Value process_root{rapidjson::kObjectType};
if (result_relativeness_ == ResultRelativeness::Relative)
processRelativeProcessInformation(current_process_stats, last_process_stats_, last_cpu_period, body, alloc);
else if (result_relativeness_ == ResultRelativeness::Absolute)
processAbsoluteProcessInformation(current_process_stats, body, alloc);
else
throw Exception(GENERAL_EXCEPTION, "Invalid result relativeness");
if (!process_root.ObjectEmpty())
body.AddMember("Process", process_root.Move(), alloc);
}
void ProcFsMonitor::processMemoryInformation(rapidjson::Value& body, rapidjson::Document::AllocatorType& alloc) {
if (auto mem_info = proc_fs_.getMemInfo()) {
body.AddMember("Memory", rapidjson::Value{rapidjson::kObjectType}, alloc);
auto& memory_root = body["Memory"];
addMemInfoToJson(*mem_info, memory_root, alloc);
}
}
void ProcFsMonitor::refreshMembers(std::vector<std::pair<std::string, CpuStatData>>&& current_cpu_stats,
std::vector<std::pair<std::string, DiskStatData>>&& current_disk_stats,
std::vector<std::pair<std::string, NetDevData>>&& current_net_devs,
std::map<pid_t, ProcessStat>&& current_process_stats) {
last_cpu_stats_ = std::move(current_cpu_stats);
last_net_devs_ = std::move(current_net_devs);
last_disk_stats_ = std::move(current_disk_stats);
last_process_stats_ = std::move(current_process_stats);
last_trigger_ = std::chrono::steady_clock::now();
}
REGISTER_RESOURCE(ProcFsMonitor, Processor);
} // namespace org::apache::nifi::minifi::extensions::procfs