extensions/standard-processors/utils/JoltUtils.cpp (1,051 lines of code) (raw):
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "JoltUtils.h"
#include "rapidjson/error/en.h"
#include "Exception.h"
namespace org::apache::nifi::minifi::utils::jolt {
static bool isSpecialChar(char ch) {
static constexpr std::array SPECIAL_CHARS{'.', '[', ']', '$', '&', '@', '#', '*'};
return std::find(SPECIAL_CHARS.begin(), SPECIAL_CHARS.end(), ch) != SPECIAL_CHARS.end();
}
bool Spec::Template::check(std::string_view str) {
enum class State {
Plain,
Escaped
} state = State::Plain;
for (char ch : str) {
switch (state) {
case State::Plain: {
if (ch == '&') {
return true;
} else if (ch == '\\') {
state = State::Escaped;
}
break;
}
case State::Escaped: {
state = State::Plain;
break;
}
}
}
return false;
}
nonstd::expected<std::pair<Spec::Template, Spec::It>, std::string> Spec::Template::parse(It begin, It end) {
enum class State {
Plain,
Escaped,
Template, // &
SimpleIndex, // &1
CanonicalTemplate, // &(
ParentIndex, // &(1
NextIndex, // &(1,
MatchIndex // &(1,0
};
std::vector<std::string> fragments;
std::vector<std::pair<size_t, size_t>> references;
fragments.push_back({});
State state = State::Plain;
std::string target;
// go beyond the last char on purpose
auto ch_it = begin;
while (ch_it <= end) {
std::optional<char> ch;
if (ch_it < end) {
ch = *ch_it;
}
bool force_terminate = false;
switch (state) {
case State::Plain: {
if (ch == '\\') {
state = State::Escaped;
} else if (ch == '&') {
references.push_back({});
fragments.push_back({});
state = State::Template;
} else if (ch == ')' || ch == ']' || ch == '.' || ch == '[') {
force_terminate = true;
} else if (ch) {
fragments.back() += ch.value();
}
break;
}
case State::Escaped: {
if (!ch) {
return nonstd::make_unexpected("Unterminated escape sequence");
}
if (ch != '\\' && !isSpecialChar(ch.value())) {
return nonstd::make_unexpected(fmt::format("Unknown escape sequence in template '\\{}'", ch.value()));
}
fragments.back() += ch.value();
state = State::Plain;
break;
}
case State::Template: {
if (ch == '(') {
state = State::CanonicalTemplate;
} else if (ch && std::isdigit(static_cast<unsigned char>(ch.value()))) {
target.clear();
target += ch.value();
state = State::SimpleIndex;
} else {
state = State::Plain;
// reprocess this char in a different state
gsl_Expects(ch_it != begin);
--ch_it;
}
break;
}
case State::SimpleIndex: {
if (ch && std::isdigit(static_cast<unsigned char>(ch.value()))) {
target += ch.value();
} else {
references.back().first = std::stoi(target);
state = State::Plain;
// reprocess this char in a different state
gsl_Expects(ch_it != begin);
--ch_it;
}
break;
}
case State::CanonicalTemplate: {
if (ch && std::isdigit(static_cast<unsigned char>(ch.value()))) {
target.clear();
target += ch.value();
state = State::ParentIndex;
} else {
return nonstd::make_unexpected(fmt::format("Expected an index at {}", std::distance(begin, ch_it)));
}
break;
}
case State::ParentIndex: {
if (ch && std::isdigit(static_cast<unsigned char>(ch.value()))) {
target += ch.value();
} else if (ch == ',') {
references.back().first = std::stoi(target);
state = State::NextIndex;
} else if (ch == ')') {
references.back().first = std::stoi(target);
state = State::Plain;
} else {
return nonstd::make_unexpected(fmt::format("Invalid character at {}, expected digit, comma or close parenthesis", std::distance(begin, ch_it)));
}
break;
}
case State::NextIndex: {
if (ch && std::isdigit(static_cast<unsigned char>(ch.value()))) {
target.clear();
target += ch.value();
state = State::MatchIndex;
} else {
return nonstd::make_unexpected(fmt::format("Expected an index at {}", std::distance(begin, ch_it)));
}
break;
}
case State::MatchIndex: {
if (ch && std::isdigit(static_cast<unsigned char>(ch.value()))) {
target += ch.value();
} else if (ch == ')') {
references.back().second = std::stoi(target);
state = State::Plain;
} else {
return nonstd::make_unexpected(fmt::format("Invalid character at {}, expected digit or close parenthesis", std::distance(begin, ch_it)));
}
break;
}
}
if (force_terminate) {
break;
}
if (ch_it != end) {
++ch_it;
} else {
break;
}
}
gsl_Assert(state == State::Plain);
return std::pair<Template, It>{Template{std::move(fragments), std::move(references)}, ch_it};
}
bool Spec::Regex::check(std::string_view str) {
enum class State {
Plain,
Escaped
} state = State::Plain;
for (char ch : str) {
switch (state) {
case State::Plain: {
if (ch == '*') {
return true;
} else if (ch == '\\') {
state = State::Escaped;
}
break;
}
case State::Escaped: {
state = State::Plain;
break;
}
}
}
return false;
}
nonstd::expected<Spec::Regex, std::string> Spec::Regex::parse(std::string_view str) {
enum class State {
Plain,
Escaped
};
std::vector<std::string> fragments;
fragments.push_back({});
State state = State::Plain;
for (size_t idx = 0; idx <= str.size(); ++idx) {
std::optional<char> ch;
if (idx < str.size()) {
ch = str[idx];
}
switch (state) {
case State::Plain: {
if (ch == '\\') {
state = State::Escaped;
} else if (ch == '*') {
fragments.push_back({});
} else if (ch) {
fragments.back() += ch.value();
}
break;
}
case State::Escaped: {
if (!ch) {
return nonstd::make_unexpected("Unterminated escape sequence");
}
if (ch != '\\' && !isSpecialChar(ch.value())) {
return nonstd::make_unexpected(fmt::format("Unknown escape sequence in pattern '\\{}'", ch.value()));
}
fragments.back() += ch.value();
state = State::Plain;
break;
}
}
}
gsl_Assert(state == State::Plain);
return Regex{std::move(fragments)};
}
std::string Spec::Template::eval(const Context& ctx) const {
std::string res;
for (size_t idx = 0; idx + 1 < fragments.size(); ++idx) {
res += fragments.at(idx);
auto& ref = references.at(idx);
auto* target = ctx.find(ref.first);
if (!target) {
throw Exception(GENERAL_EXCEPTION, fmt::format("Invalid reference to {} at {}", ref.first, ctx.path()));
}
if (target->matches.size() <= ref.second) {
throw Exception(GENERAL_EXCEPTION, fmt::format("Could not find match {} in '{}' at {}", ref.second, target->matches.at(0), ctx.path()));
}
res += target->matches.at(ref.second);
}
res += fragments.back();
return res;
}
std::optional<std::vector<std::string_view>> Spec::Regex::match(std::string_view str) const {
std::vector<std::string_view> matches;
matches.push_back(str);
if (fragments.size() == 1) {
if (str == fragments.front()) {
return matches;
} else {
return std::nullopt;
}
}
// first fragment is at the beginning of the string
if (str.substr(0, fragments.front().size()) != fragments.front()) {
return std::nullopt;
}
auto it = str.begin() + fragments.front().size();
for (size_t idx = 1; idx + 1 < fragments.size(); ++idx) {
auto& frag = fragments[idx];
auto next_it = std::search(it, str.end(), frag.begin(), frag.end());
if (next_it == str.end() && !frag.empty()) {
return std::nullopt;
}
matches.push_back({it, next_it});
it = next_it + frag.size();
}
// last fragment is at the end of the string
if (gsl::narrow<size_t>(std::distance(it, str.end())) < fragments.back().size()) {
// not enough characters left
return std::nullopt;
}
auto next_it = std::next(str.rbegin(), gsl::narrow<decltype(str.rbegin())::difference_type>(fragments.back().size())).base();
if (std::string_view(next_it, str.end()) != fragments.back()) {
return std::nullopt;
}
matches.push_back({it, next_it});
return matches;
}
namespace {
nonstd::expected<std::pair<Spec::Destination, Spec::It>, std::string> parseDestination(const Spec::Context& ctx, Spec::It begin, Spec::It end);
Spec::Destinations parseDestinations(const Spec::Context& ctx, const rapidjson::Value& val);
Spec::Pattern::Value parseValue(const Spec::Context& ctx, const rapidjson::Value& val);
std::pair<size_t, size_t> parseKeyAccess(std::string_view str) {
enum class State {
Begin,
BeginRef,
PrimaryIndex,
BeginFirstIndex,
FirstIndex,
BeginSecondIndex,
SecondIndex,
End
} state = State::Begin;
std::string target;
std::pair<size_t, size_t> result{0, 0};
for (size_t idx = 0; idx <= str.size(); ++idx) {
std::optional<char> ch;
if (idx < str.size()) {
ch = str[idx];
}
switch (state) {
case State::Begin: {
if (ch != '$') {
throw Exception(GENERAL_EXCEPTION, fmt::format("Expected '$' in key access in '{}' at {}", str, idx));
}
state = State::BeginRef;
break;
}
case State::BeginRef: {
if (ch == '(') {
state = State::BeginFirstIndex;
} else if (ch && std::isdigit(static_cast<unsigned char>(ch.value()))) {
target.clear();
target += ch.value();
state = State::PrimaryIndex;
} else if (ch) {
throw Exception(GENERAL_EXCEPTION, fmt::format("Expected index in key access in '{}' at {}", str, idx));
}
break;
}
case State::PrimaryIndex: {
if (!ch) {
result.first = std::stoull(target);
} else if (std::isdigit(static_cast<unsigned char>(ch.value()))) {
target += ch.value();
} else {
throw Exception(GENERAL_EXCEPTION, fmt::format("Expected digit in key access in '{}' at {}", str, idx));
}
break;
}
case State::BeginFirstIndex: {
if (!ch) {
throw Exception(GENERAL_EXCEPTION, fmt::format("Unterminated first index in key access in '{}'", str));
} else if (std::isdigit(static_cast<unsigned char>(ch.value()))) {
target.clear();
target += ch.value();
state = State::FirstIndex;
} else {
throw Exception(GENERAL_EXCEPTION, fmt::format("Expected digit in key access in '{}' at {}", str, idx));
}
break;
}
case State::FirstIndex: {
if (!ch) {
throw Exception(GENERAL_EXCEPTION, fmt::format("Unterminated first index in key access in '{}'", str));
} else if (std::isdigit(static_cast<unsigned char>(ch.value()))) {
target += ch.value();
} else if (ch == ',') {
result.first = std::stoull(target);
state = State::BeginSecondIndex;
}
break;
}
case State::BeginSecondIndex: {
if (!ch) {
throw Exception(GENERAL_EXCEPTION, fmt::format("Unterminated second index in key access in '{}'", str));
} else if (std::isdigit(static_cast<unsigned char>(ch.value()))) {
target.clear();
target += ch.value();
state = State::SecondIndex;
} else {
throw Exception(GENERAL_EXCEPTION, fmt::format("Expected digit in key access in '{}' at {}", str, idx));
}
break;
}
case State::SecondIndex: {
if (!ch) {
throw Exception(GENERAL_EXCEPTION, fmt::format("Unterminated second index in key access in '{}'", str));
} else if (std::isdigit(static_cast<unsigned char>(ch.value()))) {
target += ch.value();
} else if (ch == ')') {
result.second = std::stoull(target);
state = State::End;
}
break;
}
case State::End: {
if (ch) {
throw Exception(GENERAL_EXCEPTION, fmt::format("Expected end of string in '{}' at {}", str, idx));
}
break;
}
}
}
return result;
}
std::string parseLiteral(std::string_view str) {
enum class State {
Plain,
Escaped
} state = State::Plain;
std::string result;
for (size_t idx = 0; idx <= str.size(); ++idx) {
std::optional<char> ch;
if (idx < str.size()) {
ch = str[idx];
}
switch (state) {
case State::Plain: {
if (ch == '\\') {
state = State::Escaped;
} else if (ch) {
result += ch.value();
}
break;
}
case State::Escaped: {
if (!ch) {
throw Exception(GENERAL_EXCEPTION, fmt::format("Unterminated escape sequence in '{}'", str));
}
if (ch != '\\' && !isSpecialChar(ch.value())) {
throw Exception(GENERAL_EXCEPTION, fmt::format("Unknown escape sequence in literal '\\{}'", ch.value()));
}
result += ch.value();
state = State::Plain;
break;
}
}
}
gsl_Expects(state == State::Plain);
return result;
}
nonstd::expected<std::pair<Spec::Path, Spec::It>, std::string> parsePath(const Spec::Context& ctx, Spec::It begin, Spec::It end) {
auto dst = parseDestination(ctx, begin, end);
if (!dst) {
return nonstd::make_unexpected(std::move(dst.error()));
}
Spec::Path result;
for (auto&& [member, type] : std::move(dst->first)) {
if (!holds_alternative<Spec::Template>(member)) {
return nonstd::make_unexpected(fmt::format("Value reference at {} cannot contain nested value reference path", ctx.path()));
}
result.emplace_back(std::move(std::get<Spec::Template>(member)), type);
}
return std::pair<Spec::Path, Spec::It>{result, dst->second};
}
nonstd::expected<std::pair<Spec::ValueRef, Spec::It>, std::string> parseValueReference(const Spec::Context& ctx, Spec::It begin, Spec::It end, bool greedy_path) {
using ResultT = std::pair<Spec::ValueRef, Spec::It>;
auto it = begin;
if (it == end) {
return nonstd::make_unexpected("Cannot parse value reference from empty string");
}
if (*it != '@') {
return nonstd::make_unexpected("Value reference must start with '@'");
}
++it;
if (it == end) {
return ResultT{{0, {}}, it};
}
if (*it != '(') {
if (std::isdigit(static_cast<unsigned char>(*it))) {
// format is @123...
auto idx_begin = it;
while (it != end && std::isdigit(static_cast<unsigned char>(*it))) {
++it;
}
return ResultT{{std::stoull(std::string{idx_begin, it}), {}}, it};
}
// format is @field.inner
if (greedy_path) {
if (auto path = parsePath(ctx, it, end)) {
return ResultT{{0, std::move(path->first)}, path->second};
} else {
return ResultT {{0, {}}, it};
}
} else {
if (auto templ = Spec::Template::parse(it, end)) {
return ResultT{{0, Spec::Path{{std::move(templ->first), Spec::MemberType::FIELD}}}, templ->second};
} else {
return ResultT {{0, {}}, it};
}
}
}
++it;
size_t idx = 0;
if (it != end && std::isdigit(static_cast<unsigned char>(*it))) {
auto idx_begin = it;
while (it != end && std::isdigit(static_cast<unsigned char>(*it))) {
++it;
}
auto idx_end = it;
idx = std::stoull(std::string{idx_begin, idx_end});
if (it == end) {
return nonstd::make_unexpected("Expected ')' in value reference");
}
if (*it != ',') {
if (*it != ')') {
return nonstd::make_unexpected("Expected ')' in value reference");
}
++it;
return ResultT{{idx, {}}, it};
}
// *it == ','
++it;
}
if (it == end) {
return nonstd::make_unexpected("Expected member accessor in value reference");
}
auto path = parsePath(ctx, it, end);
if (!path) {
return nonstd::make_unexpected(fmt::format("Invalid path in value reference: {}", path.error()));
}
it = path->second;
if (it == end || *it != ')') {
return nonstd::make_unexpected("Expected ')' in value reference");
}
++it;
return ResultT{{idx, std::move(path->first)}, it};
}
template<typename T>
bool isAllDigits(T begin, T end) {
return std::all_of(begin, end, [] (auto ch) {return std::isdigit(static_cast<unsigned char>(ch));});
}
void parseMember(const Spec::Context& ctx, const std::unique_ptr<Spec::Pattern>& result, std::string_view name, const rapidjson::Value& member) {
if (name.starts_with("@")) {
if (auto ref = parseValueReference(ctx, name.begin(), name.end(), true)) {
if (ref->second != name.end()) {
throw Exception(GENERAL_EXCEPTION, "Failed to fully parse value reference");
}
Spec::Context sub_ctx = ctx.extend(ctx.matches, ctx.node);
result->values.push_back({Spec::ValueRef{ref->first}, parseValue(sub_ctx, member)});
} else {
throw Exception(GENERAL_EXCEPTION, fmt::format("Failed to parse value reference at '{}/{}': {}", ctx.path(), name, ref.error()));
}
} else if (name.starts_with("$")) {
Spec::Context sub_ctx = ctx.extend({name}, nullptr);
result->keys.insert({parseKeyAccess(name), parseDestinations(sub_ctx, member)});
} else if (name.starts_with("#")) {
result->defaults.insert({std::string{name.substr(1)}, parseDestinations(ctx, member)});
} else {
const bool is_template = Spec::Template::check(name);
const bool is_regex = Spec::Regex::check(name);
if (is_template && is_regex) {
throw Exception(GENERAL_EXCEPTION, "Pattern cannot contain both & and *");
}
if (is_template) {
if (auto templ = Spec::Template::parse(name.begin(), name.end())) {
if (templ->second != name.end()) {
throw Exception(GENERAL_EXCEPTION, fmt::format("Failed to parse template at {}, unexpected char at {}", ctx.path(), std::distance(name.begin(), templ->second)));
}
// dry eval so we can check if the references refer to valid substrings
(void)templ->first.eval(ctx);
Spec::Context sub_ctx = ctx.extend({name}, nullptr);
result->templates.insert({templ->first, parseValue(sub_ctx, member)});
} else {
throw Exception(GENERAL_EXCEPTION, fmt::format("Error while parsing key template at {}: {}", ctx.path(), templ.error()));
}
} else if (is_regex) {
if (auto reg = Spec::Regex::parse(name)) {
Spec::Context sub_ctx = ctx.extend({name}, nullptr);
sub_ctx.matches.resize(reg.value().size());
result->regexes.insert({reg.value(), parseValue(sub_ctx, member)});
} else {
throw Exception(GENERAL_EXCEPTION, fmt::format("Error while parsing key regex at {}: {}", ctx.path(), reg.error()));
}
} else {
Spec::Context sub_ctx = ctx.extend({name}, nullptr);
std::optional<size_t> numeric_value;
auto literal_name = parseLiteral(name);
result->literal_indices.insert({literal_name, result->literals.size()});
if (isAllDigits(literal_name.begin(), literal_name.end())) {
numeric_value = std::stoull(literal_name);
}
result->literals.push_back({literal_name, numeric_value, parseValue(sub_ctx, member)});
}
}
}
std::unique_ptr<Spec::Pattern> parseMap(const Spec::Context& ctx, const rapidjson::Value& val) {
if (!val.IsObject()) {
throw Exception(GENERAL_EXCEPTION, fmt::format("Expected a map at '{}'", ctx.path()));
}
auto map = std::make_unique<Spec::Pattern>();
enum class State {
Plain,
Escaped
} state = State::Plain;
for (auto& [name_val, member] : val.GetObject()) {
std::string_view name{name_val.GetString(), name_val.GetStringLength()};
std::string subkey;
for (size_t idx = 0; idx <= name.size(); ++idx) {
std::optional<char> ch;
if (idx < name.size()) {
ch = name[idx];
}
switch (state) {
case State::Plain: {
if (ch == '\\') {
state = State::Escaped;
} else if (!ch || ch == '|') {
parseMember(ctx, map, subkey, member);
subkey.clear();
} else {
subkey += ch.value();
}
break;
}
case State::Escaped: {
if (!ch) {
throw Exception(GENERAL_EXCEPTION, "Unterminated escape sequence");
}
if (ch == '|') {
// this is an extension so we can escape '|' characters
subkey += "|";
} else {
// leave the escape character for further processing
subkey += "\\";
subkey += ch.value();
}
state = State::Plain;
break;
}
}
}
}
return map;
}
nonstd::expected<std::pair<Spec::MatchingIndex, Spec::It>, std::string> parseMatchingIndex(Spec::It begin, Spec::It end) {
auto it = begin;
if (it == end) {
return nonstd::make_unexpected("Empty matching index");
}
if (*it != '#') {
return nonstd::make_unexpected("Matching must start with a '#'");
}
++it;
auto idx_begin = it;
while (it != end && std::isdigit(static_cast<unsigned char>(*it))) {
++it;
}
return std::pair<Spec::MatchingIndex, Spec::It>{std::stoull(std::string{idx_begin, it}), it};
}
// dot-delimited list of templates and value references
nonstd::expected<std::pair<Spec::Destination, Spec::It>, std::string> parseDestination(const Spec::Context& ctx, Spec::It begin, Spec::It end) {
Spec::Destination result;
Spec::MemberType type = Spec::MemberType::FIELD;
auto ch_it = begin;
auto isEnd = [&] () {
return ch_it == end || *ch_it == ')';
};
while (!isEnd()) {
if (auto match_idx = parseMatchingIndex(ch_it, end)) {
if (type != Spec::MemberType::INDEX) {
return nonstd::make_unexpected("Matching index can only be used in index context, e.g. apple[#2]");
}
if (!ctx.find(match_idx->first)) {
return nonstd::make_unexpected(fmt::format("Invalid matching index at {} to ancestor {}", ctx.path(), match_idx->first));
}
Spec::Destination::value_type result_element{match_idx->first, type};
result.push_back(result_element);
ch_it = match_idx->second;
} else if (auto val_ref = parseValueReference(ctx, ch_it, end, false)) {
result.push_back({std::move(val_ref->first), type});
ch_it = val_ref->second;
} else if (auto templ = Spec::Template::parse(ch_it, end)) {
// dry eval to verify that references are valid
(void)templ->first.eval(ctx);
result.push_back({std::move(templ->first), type});
ch_it = templ->second;
} else {
return nonstd::make_unexpected(fmt::format("Could not parse neither value reference or template in {} at {}", ctx.path(), std::distance(begin, ch_it)));
}
if (type == Spec::MemberType::INDEX) {
if (ch_it == end || *ch_it != ']') {
return nonstd::make_unexpected(fmt::format("Expected closing index ']' in {} at {}", ctx.path(), std::distance(begin, ch_it)));
}
++ch_it;
}
if (!isEnd()) {
if (*ch_it == '.') {
type = Spec::MemberType::FIELD;
} else if (*ch_it == '[') {
type = Spec::MemberType::INDEX;
} else {
return nonstd::make_unexpected(fmt::format("Unexpected destination delimiter '{}' in {} at {}", *ch_it, ctx.path(), std::distance(begin, ch_it)));
}
++ch_it;
if (ch_it == end) {
if (type == Spec::MemberType::FIELD) {
return nonstd::make_unexpected(fmt::format("Unterminated member in {} at {}", ctx.path(), std::distance(begin, ch_it)));
} else {
return nonstd::make_unexpected(fmt::format("Unterminated indexed member in {} at {}", ctx.path(), std::distance(begin, ch_it)));
}
}
}
}
return std::pair<Spec::Destination, Spec::It>{result, ch_it};
}
Spec::Destinations parseDestinations(const Spec::Context& ctx, const rapidjson::Value& val) {
Spec::Destinations res;
if (val.IsNull()) {
return res;
}
if (val.IsArray()) {
for (rapidjson::SizeType i = 0; i < val.GetArray().Size(); ++i) {
auto& item = val.GetArray()[i];
if (!item.IsString()) {
throw Exception(GENERAL_EXCEPTION, fmt::format("Expected a string or array of strings at '{}/{}'", ctx.path(), i));
}
std::string_view item_str{item.GetString(), item.GetStringLength()};
if (auto dst = parseDestination(ctx, item_str.begin(), item_str.end())) {
if (dst->second != item_str.end()) {
throw Exception(GENERAL_EXCEPTION, fmt::format("Failed to fully parse destination at '{}/{}'", ctx.path(), i));
}
res.push_back(std::move(dst->first));
} else {
throw Exception(GENERAL_EXCEPTION, fmt::format("Failed to parse destination at '{}/{}': {}", ctx.path(), i, dst.error()));
}
}
} else {
if (!val.IsString()) {
throw Exception(GENERAL_EXCEPTION, fmt::format("Expected a string or array of strings at '{}'", ctx.path()));
}
std::string_view val_str{val.GetString(), val.GetStringLength()};
if (auto dst = parseDestination(ctx, val_str.begin(), val_str.end())) {
if (dst->second != val_str.end()) {
throw Exception(GENERAL_EXCEPTION, fmt::format("Failed to fully parse destination at '{}'", ctx.path()));
}
res.push_back(std::move(dst->first));
} else {
throw Exception(GENERAL_EXCEPTION, fmt::format("Failed to parse destination at '{}': {}", ctx.path(), dst.error()));
}
}
return res;
}
std::optional<std::string> jsonValueToString(const rapidjson::Value& val) {
if (val.IsString()) {
return std::string{val.GetString(), val.GetStringLength()};
}
if (val.IsUint64()) {
return std::to_string(val.GetUint64());
}
if (val.IsInt64()) {
return std::to_string(val.GetInt64());
}
if (val.IsDouble()) {
return std::to_string(static_cast<int64_t>(val.GetDouble()));
}
if (val.IsBool()) {
return val.GetBool() ? "true" : "false";
}
return std::nullopt;
}
std::pair<std::string, std::string> toString(const Spec::Context& ctx, const Spec::Path& path) {
std::string raw;
std::string result;
for (auto& [member, type] : path) {
if (type == Spec::MemberType::FIELD) {
raw += "." + member.full;
result += "." + member.eval(ctx);
} else {
raw += "[" + member.full + "]";
raw += "[" + member.eval(ctx) + "]";
}
}
return {raw, result};
}
nonstd::expected<std::reference_wrapper<const rapidjson::Value>, std::string> resolvePath(const Spec::Context& ctx, std::string_view root_path, const rapidjson::Value& root, const Spec::Path& path);
Spec::Pattern::Value parseValue(const Spec::Context& ctx, const rapidjson::Value& val) {
if (val.IsObject()) {
return parseMap(ctx, val);
}
return parseDestinations(ctx, val);
}
void putValue(const Spec::Context& ctx, const Spec::Destination& dest, const rapidjson::Value& val, rapidjson::Document& output) {
std::vector<std::pair<std::string, Spec::MemberType>> evaled_dest;
for (auto& [templ, type] : dest) {
if (auto* val_ref = std::get_if<Spec::ValueRef>(&templ)) {
auto* root = ctx.find(val_ref->first);
if (!root) {
throw Exception(GENERAL_EXCEPTION, fmt::format("Could not find ancestor at index {} from {}", val_ref->first, ctx.path()));
}
if (!root->node) {
return;
}
std::reference_wrapper<const rapidjson::Value> member_value = std::ref(*root->node);
if (auto inner_member_value = resolvePath(ctx, ctx.path(), *root->node, val_ref->second)) {
member_value = inner_member_value.value();
} else {
auto sub_path = toString(ctx, val_ref->second);
ctx.log([&] (auto logger) {
logger->log_trace("Could not find member at @({},{} as {}) from {}", val_ref->first, sub_path.first, sub_path.second, ctx.path());
}, [] (auto) {});
// do not write anything and do not throw
return;
}
if (type == Spec::MemberType::INDEX) {
size_t idx{};
if (member_value.get().IsUint64()) {
idx = gsl::narrow<size_t>(member_value.get().GetUint64());
} else if (member_value.get().IsInt64()) {
int64_t idx_val = member_value.get().GetInt64();
if (idx_val < 0) {
return;
}
idx = gsl::narrow<size_t>(idx_val);
} else if (member_value.get().IsDouble()) {
// no words
double idx_val = member_value.get().GetDouble();
if (idx_val < 0) {
return;
}
idx = static_cast<size_t>(idx_val);
} else if (member_value.get().IsString()) {
// amazing... not
if (isAllDigits(member_value.get().GetString(), member_value.get().GetString() + member_value.get().GetStringLength())) {
idx = std::stoull(std::string{member_value.get().GetString(), member_value.get().GetStringLength()});
} else {
return;
}
} else {
return;
}
evaled_dest.push_back({std::to_string(idx), type});
} else {
auto member = jsonValueToString(member_value);
if (!member) {
return;
}
evaled_dest.push_back({std::move(member.value()), type});
}
continue;
}
if (auto* match_idx = std::get_if<Spec::MatchingIndex>(&templ)) {
gsl_Expects(type == Spec::MemberType::INDEX);
auto* target = ctx.find(*match_idx);
if (!target) {
throw Exception(GENERAL_EXCEPTION, fmt::format("Could not find ancestor at index {} from {}", *match_idx, ctx.path()));
}
evaled_dest.push_back({std::to_string(target->match_count), type});
continue;
}
// empty segment is self-reference, e.g. a..b == a.b
if (type == Spec::MemberType::FIELD && get<Spec::Template>(templ).empty()) {
continue;
}
evaled_dest.push_back({get<Spec::Template>(templ).eval(ctx), type});
}
std::reference_wrapper<rapidjson::Value> target = output;
for (auto& [member, type] : evaled_dest) {
if (type == Spec::MemberType::INDEX) {
if (!target.get().IsArray()) {
if (!target.get().IsNull()) {
throw Exception(GENERAL_EXCEPTION, "Cannot write based on index into non-array");
}
target.get().SetArray();
}
size_t idx = [&, member_ptr = &member] () -> size_t {
if (member_ptr->empty()) {
// an empty index like "field.inner[]" means to append to the end
return gsl::narrow<size_t>(target.get().Size());
}
try {
return std::stoull(*member_ptr);
} catch (const std::exception&) {
throw Exception(GENERAL_EXCEPTION, fmt::format("Could not convert '{}' to index", *member_ptr));
}
}();
target.get().Reserve(gsl::narrow<rapidjson::SizeType>(idx + 1), output.GetAllocator());
for (size_t arr_idx = target.get().Size(); arr_idx <= idx; ++arr_idx) {
target.get().PushBack(rapidjson::Value{}, output.GetAllocator());
}
target = target.get()[gsl::narrow<rapidjson::SizeType>(idx)];
} else {
if (!target.get().IsObject()) {
if (!target.get().IsNull()) {
throw Exception(GENERAL_EXCEPTION, "Cannot write member into non-object");
}
target.get().SetObject();
}
if (!target.get().HasMember(member)) {
target.get().AddMember(rapidjson::Value{member.c_str(), gsl::narrow<rapidjson::SizeType>(member.size()), output.GetAllocator()}, rapidjson::Value{}, output.GetAllocator());
}
target = target.get()[member];
}
}
if (!target.get().IsNull()) {
if (!target.get().IsArray()) {
// put it in an array
rapidjson::Value tmp{target.get().Move(), output.GetAllocator()};
target.get().SetArray();
target.get().GetArray().PushBack(tmp.Move(), output.GetAllocator());
}
target.get().PushBack(rapidjson::Value{}, output.GetAllocator());
target = target.get()[target.get().GetArray().Size() - 1];
}
target.get().CopyFrom(val, output.GetAllocator());
}
void putValue(const Spec::Context& ctx, const Spec::Destinations& destinations, const rapidjson::Value& val, rapidjson::Document& output) {
for (auto& dest : destinations) {
putValue(ctx, dest, val, output);
}
}
nonstd::expected<std::reference_wrapper<const rapidjson::Value>, std::string> resolvePath(const Spec::Context& ctx, std::string_view root_path, const rapidjson::Value& root, const Spec::Path& path) {
std::string full_path{root_path};
std::reference_wrapper<const rapidjson::Value> result = std::ref(root);
for (auto& [templ, type] : path) {
auto member = templ.eval(ctx);
if (type == Spec::MemberType::FIELD) {
full_path += "." + member;
if (!result.get().IsObject()) {
return nonstd::make_unexpected(fmt::format("Expected object at {}", full_path));
}
if (!result.get().HasMember(member)) {
return nonstd::make_unexpected(fmt::format("Object does not have member '{}' at {}", member, full_path));
}
result = result.get()[member];
} else if (type == Spec::MemberType::INDEX) {
size_t idx = std::stoull(member);
full_path += "[" + member + "]";
if (!result.get().IsArray()) {
return nonstd::make_unexpected(fmt::format("Expected array at {}", full_path));
}
if (result.get().Size() <= idx) {
return nonstd::make_unexpected(fmt::format("Array of size {} does not have item at index {} at {}", result.get().Size(), idx, full_path));
}
result = result.get()[gsl::narrow<rapidjson::SizeType>(idx)];
}
}
return result;
}
} // namespace
nonstd::expected<Spec, std::string> Spec::parse(std::string_view str, std::shared_ptr<core::logging::Logger> logger) {
rapidjson::Document doc;
rapidjson::ParseResult res = doc.Parse(str.data(), str.length());
if (!res) {
return nonstd::make_unexpected(fmt::format("{} at {}", rapidjson::GetParseError_En(res.Code()), res.Offset()));
}
try {
Spec::Context ctx{.matches = {"root"}, .logger = std::move(logger)};
return Spec{parseMap(ctx, doc)};
} catch (const std::exception& ex) {
return nonstd::make_unexpected(ex.what());
}
}
void Spec::Pattern::process(const Value& val, const Context& ctx, const rapidjson::Value& input, rapidjson::Document& output) {
std::visit([&] (auto& val) {
if constexpr (std::is_same_v<std::decay_t<decltype(val)>, std::unique_ptr<Pattern>>) {
val->process(ctx, input, output);
} else {
putValue(ctx, val, input, output);
}
}, val);
}
bool Spec::Pattern::processMember(const Context& ctx, std::string_view name, const rapidjson::Value& member, rapidjson::Document& output) const {
auto on_exit = ctx.log([&] (auto logger) {
logger->log_trace("Processing member '{}' of {}", name, ctx.path());
}, [&] (auto logger) {
logger->log_trace("Finished processing member '{}' of {}", name, ctx.path());
});
if (auto it = literal_indices.find(std::string{name}); it != literal_indices.end()) {
// literal is matched
Context new_ctx = ctx.extend({name}, &member);
process(std::get<2>(literals.at(it->second)), new_ctx, member, output);
return true;
}
for (auto& templ : templates) {
if (templ.first.eval(ctx) == name) {
Context new_ctx = ctx.extend({name}, &member);
process(templ.second, new_ctx, member, output);
return true;
}
}
for (auto& reg : regexes) {
if (auto matches = reg.first.match(name)) {
Context new_ctx = ctx.extend(matches.value(), &member);
process(reg.second, new_ctx, member, output);
return true;
}
}
return false;
}
void Spec::Pattern::processArray(const Context& ctx, const rapidjson::Value &input, rapidjson::Document &output) const {
gsl_Expects(input.IsArray());
Context sub_ctx = ctx;
for (auto& [key, numeric_key, value] : literals) {
if (numeric_key && numeric_key.value() < input.GetArray().Size()) {
if (processMember(sub_ctx, key, input[gsl::narrow<rapidjson::SizeType>(numeric_key.value())], output)) {
++sub_ctx.match_count;
}
}
}
for (rapidjson::SizeType i = 0; i < input.GetArray().Size(); ++i) {
if (literal_indices.contains(std::to_string(i))) {
continue;
}
if (processMember(sub_ctx, std::to_string(i), input[i], output)) {
++sub_ctx.match_count;
}
}
}
void Spec::Pattern::processObject(const Context& ctx, const rapidjson::Value &input, rapidjson::Document &output) const {
gsl_Expects(input.IsObject());
Context sub_ctx = ctx;
for (auto& [key, numeric_key, value] : literals) {
if (input.GetObject().HasMember(key)) {
if (processMember(sub_ctx, key, input[key], output)) {
++sub_ctx.match_count;
}
}
}
for (auto& [name, member] : input.GetObject()) {
if (literal_indices.contains(std::string{name.GetString(), name.GetStringLength()})) {
continue;
}
if (processMember(sub_ctx, std::string_view{name.GetString(), name.GetStringLength()}, member, output)) {
++sub_ctx.match_count;
}
}
}
void Spec::Pattern::process(const Context& ctx, const rapidjson::Value &input, rapidjson::Document &output) const {
auto on_exit = ctx.log([&] (auto logger) {
logger->log_trace("Processing node at {}", ctx.path());
}, [&] (auto logger) {
logger->log_trace("Finished processing node at {}", ctx.path());
});
for (auto& [val_ref, dest] : values) {
auto& [idx, path] = val_ref;
auto* target = ctx.find(idx);
if (!target) {
throw Exception(GENERAL_EXCEPTION, fmt::format("Could not find parent node at offset {} for {}", idx, ctx.path()));
}
if (!target->node) {
return;
}
if (auto value = resolvePath(ctx, target->path(), *target->node, path)) {
Context sub_ctx = ctx.extend(ctx.matches, ctx.node);
process(dest, sub_ctx, value.value(), output);
} else {
ctx.log([&, path_ptr = &path] (auto logger) {
auto path_str = toString(ctx, *path_ptr);
logger->log_trace("Failed to resolve value path {} (evaled as {}) at {} (triggered from {}): {}", path_str.first, path_str.second, target->path(), ctx.path(), value.error());
}, [&] (auto) {});
// pass, non-existent member is not an error
}
}
for (auto& [key, dest] : keys) {
auto key_str = ctx.find(key.first)->matches.at(key.second);
putValue(ctx.extend({key_str}, nullptr), dest, rapidjson::Value{key_str.data(), gsl::narrow<rapidjson::SizeType>(key_str.size()), output.GetAllocator()}, output);
}
for (auto& [value, dest] : defaults) {
Context sub_ctx = ctx.extend({value}, nullptr);
putValue(sub_ctx, dest, rapidjson::Value{value.data(), gsl::narrow<rapidjson::SizeType>(value.size()), output.GetAllocator()}, output);
}
if (input.IsArray()) {
processArray(ctx, input, output);
} else if (input.IsObject()) {
processObject(ctx, input, output);
} else if (input.IsString()) {
processMember(ctx, std::string_view{input.GetString(), input.GetStringLength()}, rapidjson::Value{}, output);
} else if (input.IsUint64()) {
processMember(ctx, std::to_string(input.GetUint64()), rapidjson::Value{}, output);
} else if (input.IsInt64()) {
processMember(ctx, std::to_string(input.GetInt64()), rapidjson::Value{}, output);
} else if (input.IsDouble()) {
processMember(ctx, std::to_string(input.GetDouble()), rapidjson::Value{}, output);
} else if (input.IsBool()) {
processMember(ctx, input.GetBool() ? "true" : "false", rapidjson::Value{}, output);
}
}
nonstd::expected<rapidjson::Document, std::string> Spec::process(const rapidjson::Value &input, std::shared_ptr<core::logging::Logger> logger) const {
rapidjson::Document output;
try {
value_->process(Context{.matches = {"root"}, .node = &input, .logger = std::move(logger)}, input, output);
return output;
} catch (const std::exception& ex) {
return nonstd::make_unexpected(ex.what());
}
}
} // namespace org::apache::nifi::minifi::utils::jolt