extensions/standard-processors/processors/RouteText.cpp (332 lines of code) (raw):
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "RouteText.h"
#include <algorithm>
#include <map>
#include <utility>
#include <vector>
#include "core/ProcessContext.h"
#include "core/ProcessSession.h"
#include "core/Resource.h"
#include "core/logging/LoggerFactory.h"
#include "io/StreamPipe.h"
#include "range/v3/range/conversion.hpp"
#include "range/v3/view/join.hpp"
#include "range/v3/view/tail.hpp"
#include "range/v3/view/transform.hpp"
#include "range/v3/algorithm/all_of.hpp"
#include "range/v3/algorithm/any_of.hpp"
#include "utils/OptionalUtils.h"
#include "utils/ProcessorConfigUtils.h"
#include "utils/Searcher.h"
namespace org::apache::nifi::minifi::processors {
RouteText::RouteText(std::string_view name, const utils::Identifier& uuid)
: core::ProcessorImpl(name, uuid), logger_(core::logging::LoggerFactory<RouteText>::getLogger(uuid)) {}
void RouteText::initialize() {
setSupportedProperties(Properties);
setSupportedRelationships(Relationships);
}
void RouteText::onSchedule(core::ProcessContext& context, core::ProcessSessionFactory&) {
routing_ = utils::parseEnumProperty<route_text::Routing>(context, RoutingStrategy);
matching_ = utils::parseEnumProperty<route_text::Matching>(context, MatchingStrategy);
trim_ = utils::parseBoolProperty(context, TrimWhitespace);
case_policy_ = utils::parseBoolProperty(context, IgnoreCase) ? route_text::CasePolicy::IGNORE_CASE : route_text::CasePolicy::CASE_SENSITIVE;
group_regex_ = context.getProperty(GroupingRegex) | utils::toOptional() | utils::transform([] (const auto& str) {return utils::Regex(str);});
segmentation_ = utils::parseEnumProperty<route_text::Segmentation>(context, SegmentationStrategy);
group_fallback_ = context.getProperty(GroupingFallbackValue).value_or("");
{
const auto static_relationships = RouteText::Relationships;
std::vector<core::RelationshipDefinition> relationships(static_relationships.begin(), static_relationships.end());
for (const auto& property_name : getDynamicPropertyKeys()) {
core::RelationshipDefinition rel{property_name, "Dynamic Route"};
dynamic_relationships_[property_name] = rel;
relationships.push_back(rel);
logger_->log_info("RouteText registered dynamic route '{}'", property_name);
}
setSupportedRelationships(relationships);
}
}
class RouteText::ReadCallback {
using Fn = std::function<void(Segment)>;
public:
ReadCallback(route_text::Segmentation segmentation, size_t file_size, Fn&& fn)
: segmentation_(segmentation), file_size_(file_size), fn_(std::move(fn)) {}
int64_t operator()(const std::shared_ptr<io::InputStream>& stream) const {
std::vector<std::byte> buffer;
buffer.resize(file_size_);
size_t ret = stream->read(buffer);
if (io::isError(ret)) {
return -1;
}
if (ret != file_size_) {
throw Exception(PROCESS_SESSION_EXCEPTION, "Couldn't read whole flowfile content");
}
std::string_view content{reinterpret_cast<const char*>(buffer.data()), buffer.size()};
switch (segmentation_) {
case route_text::Segmentation::FULL_TEXT: {
fn_({content, 0});
return gsl::narrow<int64_t>(content.length());
}
case route_text::Segmentation::PER_LINE: {
// 1-based index as in nifi
size_t segment_idx = 1;
std::string_view::size_type curr = 0;
while (curr < content.length()) {
// find beginning of next line
std::string_view::size_type next_line = content.find('\n', curr);
if (next_line == std::string_view::npos) {
fn_({content.substr(curr), segment_idx});
} else {
// include newline character to be in-line with nifi semantics
++next_line;
fn_({content.substr(curr, next_line - curr), segment_idx});
}
curr = next_line;
++segment_idx;
}
return gsl::narrow<int64_t>(content.length());
}
}
throw Exception(PROCESSOR_EXCEPTION, "Unknown segmentation strategy");
}
private:
route_text::Segmentation segmentation_;
size_t file_size_;
Fn fn_;
};
class RouteText::MatchingContext {
struct CaseAwareHash {
explicit CaseAwareHash(route_text::CasePolicy policy): policy_(policy) {}
size_t operator()(char ch) const {
if (policy_ == route_text::CasePolicy::CASE_SENSITIVE) {
return static_cast<size_t>(ch);
}
return std::hash<int>{}(std::tolower(static_cast<unsigned char>(ch)));
}
private:
route_text::CasePolicy policy_;
};
struct CaseAwareEq {
explicit CaseAwareEq(route_text::CasePolicy policy): policy_(policy) {}
bool operator()(char a, char b) const {
if (policy_ == route_text::CasePolicy::CASE_SENSITIVE) {
return a == b;
}
return std::tolower(static_cast<unsigned char>(a)) == std::tolower(static_cast<unsigned char>(b));
}
private:
route_text::CasePolicy policy_;
};
public:
MatchingContext(core::ProcessContext& process_context, std::shared_ptr<core::FlowFile> flow_file, route_text::CasePolicy case_policy)
: process_context_(process_context),
flow_file_(std::move(flow_file)),
case_policy_(case_policy) {}
const utils::Regex& getRegexProperty(const std::string& property_name) {
auto it = regex_values_.find(property_name);
if (it != regex_values_.end()) {
return it->second;
}
const auto value = process_context_.getDynamicProperty(property_name, flow_file_.get()) | utils::orThrow("Missing dynamic property");
std::vector<utils::Regex::Mode> flags;
if (case_policy_ == route_text::CasePolicy::IGNORE_CASE) {
flags.push_back(utils::Regex::Mode::ICASE);
}
return (regex_values_[property_name] = utils::Regex(value, flags));
}
const std::string& getStringProperty(const std::string& property_name) {
auto it = string_values_.find(property_name);
if (it != string_values_.end()) {
return it->second;
}
const auto value = process_context_.getDynamicProperty(property_name, flow_file_.get()) | utils::orThrow("Missing dynamic property");
return (string_values_[property_name] = value);
}
const auto& getSearcher(const std::string& property_name) {
auto it = searcher_values_.find(property_name);
if (it != searcher_values_.end()) {
return it->second.searcher_;
}
const auto value = process_context_.getDynamicProperty(property_name, flow_file_.get()) | utils::orThrow("Missing dynamic property");
return searcher_values_.emplace(
std::piecewise_construct, std::forward_as_tuple(property_name),
std::forward_as_tuple(value, case_policy_)).first->second.searcher_;
}
core::ProcessContext& process_context_;
std::shared_ptr<core::FlowFile> flow_file_;
route_text::CasePolicy case_policy_;
std::map<std::string, std::string> string_values_;
std::map<std::string, utils::Regex> regex_values_;
struct OwningSearcher {
OwningSearcher(std::string str, route_text::CasePolicy case_policy)
: str_(std::move(str)), searcher_(str_.cbegin(), str_.cend(), CaseAwareHash{case_policy}, CaseAwareEq{case_policy}) {}
OwningSearcher(const OwningSearcher&) = delete;
OwningSearcher(OwningSearcher&&) = delete;
OwningSearcher& operator=(const OwningSearcher&) = delete;
OwningSearcher& operator=(OwningSearcher&&) = delete;
~OwningSearcher() = default;
std::string str_;
utils::Searcher<std::string::const_iterator, CaseAwareHash, CaseAwareEq> searcher_;
};
std::map<std::string, OwningSearcher> searcher_values_;
};
namespace {
struct Route {
core::Relationship relationship_;
std::optional<std::string> group_name_;
bool operator<(const Route& other) const {
return std::tie(relationship_, group_name_) < std::tie(other.relationship_, other.group_name_);
}
};
} // namespace
void RouteText::onTrigger(core::ProcessContext& context, core::ProcessSession& session) {
auto flow_file = session.get();
if (!flow_file) {
context.yield();
return;
}
std::map<Route, std::string> flow_file_contents;
MatchingContext matching_context(context, flow_file, case_policy_);
ReadCallback callback(segmentation_, flow_file->getSize(), [&] (Segment segment) {
std::string_view original_value = segment.value_;
std::string_view preprocessed_value = preprocess(segment.value_);
if (matching_ != route_text::Matching::EXPRESSION) {
// an Expression has access to the raw segment like in nifi
// all others use the preprocessed_value
segment.value_ = preprocessed_value;
}
// group extraction always uses the preprocessed
auto group = getGroup(preprocessed_value);
auto dynamic_property_keys = getDynamicPropertyKeys();
switch (routing_) {
case route_text::Routing::ALL: {
if (ranges::all_of(dynamic_property_keys, [&] (const auto& property_name) {
return matchSegment(matching_context, segment, property_name);
})) {
flow_file_contents[{Matched, group}] += original_value;
} else {
flow_file_contents[{Unmatched, group}] += original_value;
}
return;
}
case route_text::Routing::ANY: {
if (ranges::any_of(dynamic_property_keys, [&] (const auto& property_name) {
return matchSegment(matching_context, segment, property_name);
})) {
flow_file_contents[{Matched, group}] += original_value;
} else {
flow_file_contents[{Unmatched, group}] += original_value;
}
return;
}
case route_text::Routing::DYNAMIC: {
bool routed = false;
for (const auto& property_name : dynamic_property_keys) {
if (matchSegment(matching_context, segment, property_name)) {
flow_file_contents[{dynamic_relationships_[property_name], group}] += original_value;
routed = true;
}
}
if (!routed) {
flow_file_contents[{Unmatched, group}] += original_value;
}
return;
}
}
throw Exception(PROCESSOR_EXCEPTION, "Unknown routing strategy");
});
session.read(flow_file, std::move(callback));
for (const auto& [route, content] : flow_file_contents) {
auto new_flow_file = session.create(flow_file.get());
if (route.group_name_) {
new_flow_file->setAttribute(GROUP_ATTRIBUTE_NAME, route.group_name_.value());
}
session.writeBuffer(new_flow_file, content);
session.transfer(new_flow_file, route.relationship_);
}
session.transfer(flow_file, Original);
}
std::string_view RouteText::preprocess(std::string_view str) const {
if (segmentation_ == route_text::Segmentation::PER_LINE) {
// do not consider the trailing \r\n characters in order to conform to nifi
auto len = str.find_last_not_of("\r\n");
if (len != std::string_view::npos) {
str = str.substr(0, len + 1);
} else {
str = "";
}
}
if (trim_) {
str = utils::string::trim(str);
}
return str;
}
namespace {
std::optional<std::string> getDynamicPropertyWithOverrides(core::ProcessContext& context,
const std::string& property_name,
core::FlowFile& flow_file,
const std::map<std::string, std::string>& overrides) {
std::map<std::string, std::optional<std::string>> original_attributes;
for (const auto& [override_key, override_value] : overrides) {
original_attributes[override_key] = flow_file.getAttribute(override_key);
flow_file.setAttribute(override_key, override_value);
}
auto onExit = gsl::finally([&]{
for (const auto& attr : original_attributes) {
if (attr.second) {
flow_file.setAttribute(attr.first, attr.second.value());
} else {
flow_file.removeAttribute(attr.first);
}
}
});
return context.getDynamicProperty(property_name, &flow_file) | utils::toOptional();
}
} // namespace
bool RouteText::matchSegment(MatchingContext& context, const Segment& segment, const std::string& property_name) const {
switch (matching_) {
case route_text::Matching::EXPRESSION: {
std::map<std::string, std::string> variables;
variables["segment"] = segment.value_;
variables["segmentNo"] = std::to_string(segment.idx_);
if (segmentation_ == route_text::Segmentation::PER_LINE) {
// for nifi compatibility
variables["line"] = segment.value_;
variables["lineNo"] = std::to_string(segment.idx_);
}
if (const auto result = getDynamicPropertyWithOverrides(context.process_context_, property_name, *context.flow_file_, variables)) {
return utils::string::toBool(*result).value_or(false);
} else {
throw Exception(PROCESSOR_EXCEPTION, "Missing dynamic property: '" + property_name + "'");
}
}
case route_text::Matching::STARTS_WITH: {
return utils::string::startsWith(segment.value_, context.getStringProperty(property_name), case_policy_ == route_text::CasePolicy::CASE_SENSITIVE);
}
case route_text::Matching::ENDS_WITH: {
return utils::string::endsWith(segment.value_, context.getStringProperty(property_name), case_policy_ == route_text::CasePolicy::CASE_SENSITIVE);
}
case route_text::Matching::CONTAINS: {
return std::search(segment.value_.begin(), segment.value_.end(), context.getSearcher(property_name)) != segment.value_.end();
}
case route_text::Matching::EQUALS: {
return utils::string::equals(segment.value_, context.getStringProperty(property_name), case_policy_ == route_text::CasePolicy::CASE_SENSITIVE);
}
case route_text::Matching::CONTAINS_REGEX: {
std::string segment_str = std::string(segment.value_);
return utils::regexSearch(segment_str, context.getRegexProperty(property_name));
}
case route_text::Matching::MATCHES_REGEX: {
std::string segment_str = std::string(segment.value_);
return utils::regexMatch(segment_str, context.getRegexProperty(property_name));
}
}
throw Exception(PROCESSOR_EXCEPTION, "Unknown matching strategy");
}
std::optional<std::string> RouteText::getGroup(const std::string_view& segment) const {
if (!group_regex_) {
return std::nullopt;
}
utils::SMatch match_result;
std::string segment_str = std::string(segment);
if (!utils::regexMatch(segment_str, match_result, group_regex_.value())) {
return group_fallback_;
}
// unused capturing groups default to empty string
auto to_string = [] (const auto& submatch) -> std::string {return submatch;};
return ranges::views::tail(match_result) // only join the capture groups
| ranges::views::transform(to_string)
| ranges::views::join(std::string_view(", "))
| ranges::to<std::string>();
}
REGISTER_RESOURCE(RouteText, Processor);
} // namespace org::apache::nifi::minifi::processors