extensions/standard-processors/processors/RouteText.cpp (318 lines of code) (raw):
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "RouteText.h"
#include <algorithm>
#include <map>
#include <vector>
#include <utility>
#include "core/ProcessSession.h"
#include "core/Resource.h"
#include "io/StreamPipe.h"
#include "logging/LoggerConfiguration.h"
#include "range/v3/view/transform.hpp"
#include "range/v3/range/conversion.hpp"
#include "range/v3/view/tail.hpp"
#include "range/v3/view/join.hpp"
#include "utils/ProcessorConfigUtils.h"
#include "utils/OptionalUtils.h"
#include "utils/Searcher.h"
namespace org::apache::nifi::minifi::processors {
RouteText::RouteText(std::string name, const utils::Identifier& uuid)
: core::Processor(std::move(name), uuid), logger_(core::logging::LoggerFactory<RouteText>::getLogger(uuid)) {}
void RouteText::initialize() {
setSupportedProperties(Properties);
setSupportedRelationships(Relationships);
}
void RouteText::onSchedule(core::ProcessContext* context, core::ProcessSessionFactory* /*sessionFactory*/) {
gsl_Expects(context);
routing_ = utils::parseEnumProperty<route_text::Routing>(*context, RoutingStrategy);
matching_ = utils::parseEnumProperty<route_text::Matching>(*context, MatchingStrategy);
context->getProperty(TrimWhitespace, trim_);
case_policy_ = context->getProperty<bool>(IgnoreCase).value_or(false) ? route_text::CasePolicy::IGNORE_CASE : route_text::CasePolicy::CASE_SENSITIVE;
group_regex_ = context->getProperty(GroupingRegex) | utils::map([] (const auto& str) {return utils::Regex(str);});
segmentation_ = utils::parseEnumProperty<route_text::Segmentation>(*context, SegmentationStrategy);
context->getProperty(GroupingFallbackValue, group_fallback_);
}
class RouteText::ReadCallback {
using Fn = std::function<void(Segment)>;
public:
ReadCallback(route_text::Segmentation segmentation, size_t file_size, Fn&& fn)
: segmentation_(segmentation), file_size_(file_size), fn_(std::move(fn)) {}
int64_t operator()(const std::shared_ptr<io::InputStream>& stream) const {
std::vector<std::byte> buffer;
buffer.resize(file_size_);
size_t ret = stream->read(buffer);
if (io::isError(ret)) {
return -1;
}
if (ret != file_size_) {
throw Exception(PROCESS_SESSION_EXCEPTION, "Couldn't read whole flowfile content");
}
std::string_view content{reinterpret_cast<const char*>(buffer.data()), buffer.size()};
switch (segmentation_) {
case route_text::Segmentation::FULL_TEXT: {
fn_({content, 0});
return gsl::narrow<int64_t>(content.length());
}
case route_text::Segmentation::PER_LINE: {
// 1-based index as in nifi
size_t segment_idx = 1;
std::string_view::size_type curr = 0;
while (curr < content.length()) {
// find beginning of next line
std::string_view::size_type next_line = content.find('\n', curr);
if (next_line == std::string_view::npos) {
fn_({content.substr(curr), segment_idx});
} else {
// include newline character to be in-line with nifi semantics
++next_line;
fn_({content.substr(curr, next_line - curr), segment_idx});
}
curr = next_line;
++segment_idx;
}
return gsl::narrow<int64_t>(content.length());
}
}
throw Exception(PROCESSOR_EXCEPTION, "Unknown segmentation strategy");
}
private:
route_text::Segmentation segmentation_;
size_t file_size_;
Fn fn_;
};
class RouteText::MatchingContext {
struct CaseAwareHash {
explicit CaseAwareHash(route_text::CasePolicy policy): policy_(policy) {}
size_t operator()(char ch) const {
if (policy_ == route_text::CasePolicy::CASE_SENSITIVE) {
return static_cast<size_t>(ch);
}
return std::hash<int>{}(std::tolower(static_cast<unsigned char>(ch)));
}
private:
route_text::CasePolicy policy_;
};
struct CaseAwareEq {
explicit CaseAwareEq(route_text::CasePolicy policy): policy_(policy) {}
bool operator()(char a, char b) const {
if (policy_ == route_text::CasePolicy::CASE_SENSITIVE) {
return a == b;
}
return std::tolower(static_cast<unsigned char>(a)) == std::tolower(static_cast<unsigned char>(b));
}
private:
route_text::CasePolicy policy_;
};
public:
MatchingContext(core::ProcessContext& process_context, std::shared_ptr<core::FlowFile> flow_file, route_text::CasePolicy case_policy)
: process_context_(process_context),
flow_file_(std::move(flow_file)),
case_policy_(case_policy) {}
const utils::Regex& getRegexProperty(const core::Property& prop) {
auto it = regex_values_.find(prop.getName());
if (it != regex_values_.end()) {
return it->second;
}
std::string value;
if (!process_context_.getDynamicProperty(prop, value, flow_file_)) {
throw Exception(PROCESSOR_EXCEPTION, "Missing dynamic property: '" + prop.getName() + "'");
}
std::vector<utils::Regex::Mode> flags;
if (case_policy_ == route_text::CasePolicy::IGNORE_CASE) {
flags.push_back(utils::Regex::Mode::ICASE);
}
return (regex_values_[prop.getName()] = utils::Regex(value, flags));
}
const std::string& getStringProperty(const core::Property& prop) {
auto it = string_values_.find(prop.getName());
if (it != string_values_.end()) {
return it->second;
}
std::string value;
if (!process_context_.getDynamicProperty(prop, value, flow_file_)) {
throw Exception(PROCESSOR_EXCEPTION, "Missing dynamic property: '" + prop.getName() + "'");
}
return (string_values_[prop.getName()] = value);
}
const auto& getSearcher(const core::Property& prop) {
auto it = searcher_values_.find(prop.getName());
if (it != searcher_values_.end()) {
return it->second.searcher_;
}
std::string value;
if (!process_context_.getDynamicProperty(prop, value, flow_file_)) {
throw Exception(PROCESSOR_EXCEPTION, "Missing dynamic property: '" + prop.getName() + "'");
}
return searcher_values_.emplace(
std::piecewise_construct, std::forward_as_tuple(prop.getName()),
std::forward_as_tuple(value, case_policy_)).first->second.searcher_;
}
core::ProcessContext& process_context_;
std::shared_ptr<core::FlowFile> flow_file_;
route_text::CasePolicy case_policy_;
std::map<std::string, std::string> string_values_;
std::map<std::string, utils::Regex> regex_values_;
struct OwningSearcher {
OwningSearcher(std::string str, route_text::CasePolicy case_policy)
: str_(std::move(str)), searcher_(str_.cbegin(), str_.cend(), CaseAwareHash{case_policy}, CaseAwareEq{case_policy}) {}
OwningSearcher(const OwningSearcher&) = delete;
OwningSearcher(OwningSearcher&&) = delete;
OwningSearcher& operator=(const OwningSearcher&) = delete;
OwningSearcher& operator=(OwningSearcher&&) = delete;
std::string str_;
utils::Searcher<std::string::const_iterator, CaseAwareHash, CaseAwareEq> searcher_;
};
std::map<std::string, OwningSearcher> searcher_values_;
};
namespace {
struct Route {
core::Relationship relationship_;
std::optional<std::string> group_name_;
bool operator<(const Route& other) const {
return std::tie(relationship_, group_name_) < std::tie(other.relationship_, other.group_name_);
}
};
} // namespace
void RouteText::onTrigger(core::ProcessContext *context, core::ProcessSession *session) {
gsl_Expects(context && session);
auto flow_file = session->get();
if (!flow_file) {
context->yield();
return;
}
std::map<Route, std::string> flow_file_contents;
MatchingContext matching_context(*context, flow_file, case_policy_);
ReadCallback callback(segmentation_, flow_file->getSize(), [&] (Segment segment) {
std::string_view original_value = segment.value_;
std::string_view preprocessed_value = preprocess(segment.value_);
if (matching_ != route_text::Matching::EXPRESSION) {
// an Expression has access to the raw segment like in nifi
// all others use the preprocessed_value
segment.value_ = preprocessed_value;
}
// group extraction always uses the preprocessed
auto group = getGroup(preprocessed_value);
switch (routing_) {
case route_text::Routing::ALL: {
if (std::all_of(dynamic_properties_.cbegin(), dynamic_properties_.cend(), [&] (const auto& prop) {
return matchSegment(matching_context, segment, prop.second);
})) {
flow_file_contents[{Matched, group}] += original_value;
} else {
flow_file_contents[{Unmatched, group}] += original_value;
}
return;
}
case route_text::Routing::ANY: {
if (std::any_of(dynamic_properties_.cbegin(), dynamic_properties_.cend(), [&] (const auto& prop) {
return matchSegment(matching_context, segment, prop.second);
})) {
flow_file_contents[{Matched, group}] += original_value;
} else {
flow_file_contents[{Unmatched, group}] += original_value;
}
return;
}
case route_text::Routing::DYNAMIC: {
bool routed = false;
for (const auto& [property_name, prop] : dynamic_properties_) {
if (matchSegment(matching_context, segment, prop)) {
flow_file_contents[{dynamic_relationships_[property_name], group}] += original_value;
routed = true;
}
}
if (!routed) {
flow_file_contents[{Unmatched, group}] += original_value;
}
return;
}
}
throw Exception(PROCESSOR_EXCEPTION, "Unknown routing strategy");
});
session->read(flow_file, std::move(callback));
for (const auto& [route, content] : flow_file_contents) {
auto new_flow_file = session->create(flow_file);
if (route.group_name_) {
new_flow_file->setAttribute(GROUP_ATTRIBUTE_NAME, route.group_name_.value());
}
session->writeBuffer(new_flow_file, content);
session->transfer(new_flow_file, route.relationship_);
}
session->transfer(flow_file, Original);
}
std::string_view RouteText::preprocess(std::string_view str) const {
if (segmentation_ == route_text::Segmentation::PER_LINE) {
// do not consider the trailing \r\n characters in order to conform to nifi
auto len = str.find_last_not_of("\r\n");
if (len != std::string_view::npos) {
str = str.substr(0, len + 1);
} else {
str = "";
}
}
if (trim_) {
str = utils::StringUtils::trim(str);
}
return str;
}
bool RouteText::matchSegment(MatchingContext& context, const Segment& segment, const core::Property& prop) const {
switch (matching_) {
case route_text::Matching::EXPRESSION: {
std::map<std::string, std::string> variables;
variables["segment"] = segment.value_;
variables["segmentNo"] = std::to_string(segment.idx_);
if (segmentation_ == route_text::Segmentation::PER_LINE) {
// for nifi compatibility
variables["line"] = segment.value_;
variables["lineNo"] = std::to_string(segment.idx_);
}
std::string result;
if (context.process_context_.getDynamicProperty(prop, result, context.flow_file_, variables)) {
return utils::StringUtils::toBool(result).value_or(false);
} else {
throw Exception(PROCESSOR_EXCEPTION, "Missing dynamic property: '" + prop.getName() + "'");
}
}
case route_text::Matching::STARTS_WITH: {
return utils::StringUtils::startsWith(segment.value_, context.getStringProperty(prop), case_policy_ == route_text::CasePolicy::CASE_SENSITIVE);
}
case route_text::Matching::ENDS_WITH: {
return utils::StringUtils::endsWith(segment.value_, context.getStringProperty(prop), case_policy_ == route_text::CasePolicy::CASE_SENSITIVE);
}
case route_text::Matching::CONTAINS: {
return std::search(segment.value_.begin(), segment.value_.end(), context.getSearcher(prop)) != segment.value_.end();
}
case route_text::Matching::EQUALS: {
return utils::StringUtils::equals(segment.value_, context.getStringProperty(prop), case_policy_ == route_text::CasePolicy::CASE_SENSITIVE);
}
case route_text::Matching::CONTAINS_REGEX: {
std::string segment_str = std::string(segment.value_);
return utils::regexSearch(segment_str, context.getRegexProperty(prop));
}
case route_text::Matching::MATCHES_REGEX: {
std::string segment_str = std::string(segment.value_);
return utils::regexMatch(segment_str, context.getRegexProperty(prop));
}
}
throw Exception(PROCESSOR_EXCEPTION, "Unknown matching strategy");
}
std::optional<std::string> RouteText::getGroup(const std::string_view& segment) const {
if (!group_regex_) {
return std::nullopt;
}
utils::SMatch match_result;
std::string segment_str = std::string(segment);
if (!utils::regexMatch(segment_str, match_result, group_regex_.value())) {
return group_fallback_;
}
// unused capturing groups default to empty string
auto to_string = [] (const auto& submatch) -> std::string {return submatch;};
return ranges::views::tail(match_result) // only join the capture groups
| ranges::views::transform(to_string)
| ranges::views::join(std::string_view(", "))
| ranges::to<std::string>();
}
void RouteText::onDynamicPropertyModified(const core::Property& /*orig_property*/, const core::Property& new_property) {
dynamic_properties_[new_property.getName()] = new_property;
const auto static_relationships = RouteText::Relationships;
std::vector<core::RelationshipDefinition> relationships(static_relationships.begin(), static_relationships.end());
for (const auto& [property_name, prop] : dynamic_properties_) {
core::RelationshipDefinition rel{property_name, "Dynamic Route"};
dynamic_relationships_[property_name] = rel;
relationships.push_back(rel);
logger_->log_info("RouteText registered dynamic route '%s' with expression '%s'", property_name, prop.getValue().to_string());
}
setSupportedRelationships(relationships);
}
REGISTER_RESOURCE(RouteText, Processor);
} // namespace org::apache::nifi::minifi::processors