libminifi/src/core/ProcessorMetrics.cpp (121 lines of code) (raw):

/** * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ #include "core/ProcessorMetrics.h" #include "core/Processor.h" #include "utils/gsl.h" #include "range/v3/numeric/accumulate.hpp" using namespace std::literals::chrono_literals; namespace org::apache::nifi::minifi::core { ProcessorMetrics::ProcessorMetrics(const Processor& source_processor) : source_processor_(source_processor), on_trigger_runtime_averager_(STORED_ON_TRIGGER_RUNTIME_COUNT), session_commit_runtime_averager_(STORED_ON_TRIGGER_RUNTIME_COUNT) { } std::string ProcessorMetrics::getName() const { return source_processor_.getProcessorType() + "Metrics"; } std::unordered_map<std::string, std::string> ProcessorMetrics::getCommonLabels() const { return {{"metric_class", getName()}, {"processor_name", source_processor_.getName()}, {"processor_uuid", source_processor_.getUUIDStr()}}; } std::vector<state::response::SerializedResponseNode> ProcessorMetrics::serialize() { std::vector<state::response::SerializedResponseNode> resp; state::response::SerializedResponseNode root_node { .name = source_processor_.getUUIDStr(), .children = { {.name = "OnTriggerInvocations", .value = static_cast<uint32_t>(iterations.load())}, {.name = "AverageOnTriggerRunTime", .value = static_cast<uint64_t>(getAverageOnTriggerRuntime().count())}, {.name = "LastOnTriggerRunTime", .value = static_cast<uint64_t>(getLastOnTriggerRuntime().count())}, {.name = "AverageSessionCommitRunTime", .value = static_cast<uint64_t>(getAverageSessionCommitRuntime().count())}, {.name = "LastSessionCommitRunTime", .value = static_cast<uint64_t>(getLastSessionCommitRuntime().count())}, {.name = "TransferredFlowFiles", .value = static_cast<uint32_t>(transferred_flow_files.load())}, {.name = "TransferredBytes", .value = transferred_bytes.load()} } }; { std::lock_guard<std::mutex> lock(transferred_relationships_mutex_); for (const auto& [relationship, count] : transferred_relationships_) { gsl_Expects(!relationship.empty()); state::response::SerializedResponseNode transferred_to_relationship_node; transferred_to_relationship_node.name = std::string("TransferredTo").append(1, toupper(relationship[0])).append(relationship.substr(1)); transferred_to_relationship_node.value = static_cast<uint32_t>(count); root_node.children.push_back(transferred_to_relationship_node); } } resp.push_back(root_node); return resp; } std::vector<state::PublishedMetric> ProcessorMetrics::calculateMetrics() { std::vector<state::PublishedMetric> metrics = { {"onTrigger_invocations", static_cast<double>(iterations.load()), getCommonLabels()}, {"average_onTrigger_runtime_milliseconds", static_cast<double>(getAverageOnTriggerRuntime().count()), getCommonLabels()}, {"last_onTrigger_runtime_milliseconds", static_cast<double>(getLastOnTriggerRuntime().count()), getCommonLabels()}, {"average_session_commit_runtime_milliseconds", static_cast<double>(getAverageSessionCommitRuntime().count()), getCommonLabels()}, {"last_session_commit_runtime_milliseconds", static_cast<double>(getLastSessionCommitRuntime().count()), getCommonLabels()}, {"transferred_flow_files", static_cast<double>(transferred_flow_files.load()), getCommonLabels()}, {"transferred_bytes", static_cast<double>(transferred_bytes.load()), getCommonLabels()} }; { std::lock_guard<std::mutex> lock(transferred_relationships_mutex_); for (const auto& [relationship, count] : transferred_relationships_) { metrics.push_back({"transferred_to_" + relationship, static_cast<double>(count), {{"metric_class", getName()}, {"processor_name", source_processor_.getName()}, {"processor_uuid", source_processor_.getUUIDStr()}}}); } } return metrics; } void ProcessorMetrics::increaseRelationshipTransferCount(const std::string& relationship, size_t count) { std::lock_guard<std::mutex> lock(transferred_relationships_mutex_); transferred_relationships_[relationship] += count; } std::chrono::milliseconds ProcessorMetrics::getAverageOnTriggerRuntime() const { return on_trigger_runtime_averager_.getAverage(); } void ProcessorMetrics::addLastOnTriggerRuntime(std::chrono::milliseconds runtime) { on_trigger_runtime_averager_.addValue(runtime); } std::chrono::milliseconds ProcessorMetrics::getLastOnTriggerRuntime() const { return on_trigger_runtime_averager_.getLastValue(); } std::chrono::milliseconds ProcessorMetrics::getAverageSessionCommitRuntime() const { return session_commit_runtime_averager_.getAverage(); } void ProcessorMetrics::addLastSessionCommitRuntime(std::chrono::milliseconds runtime) { session_commit_runtime_averager_.addValue(runtime); } std::chrono::milliseconds ProcessorMetrics::getLastSessionCommitRuntime() const { return session_commit_runtime_averager_.getLastValue(); } template<typename ValueType> requires Summable<ValueType> && DividableByInteger<ValueType> ValueType ProcessorMetrics::Averager<ValueType>::getAverage() const { std::lock_guard<std::mutex> lock(average_value_mutex_); if (values_.empty()) { return {}; } return ranges::accumulate(values_, ValueType{}) / values_.size(); } template<typename ValueType> requires Summable<ValueType> && DividableByInteger<ValueType> void ProcessorMetrics::Averager<ValueType>::addValue(ValueType runtime) { std::lock_guard<std::mutex> lock(average_value_mutex_); if (values_.size() < SAMPLE_SIZE_) { values_.push_back(runtime); } else { if (next_average_index_ >= values_.size()) { next_average_index_ = 0; } values_[next_average_index_] = runtime; ++next_average_index_; } } template<typename ValueType> requires Summable<ValueType> && DividableByInteger<ValueType> ValueType ProcessorMetrics::Averager<ValueType>::getLastValue() const { std::lock_guard<std::mutex> lock(average_value_mutex_); if (values_.empty()) { return {}; } else if (values_.size() < SAMPLE_SIZE_) { return values_[values_.size() - 1]; } else { return values_[next_average_index_ - 1]; } } } // namespace org::apache::nifi::minifi::core