libminifi/src/c2/HeartbeatJsonSerializer.cpp (239 lines of code) (raw):

/** * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ #include "c2/HeartbeatJsonSerializer.h" #include "rapidjson/document.h" #include "rapidjson/writer.h" #include "rapidjson/stringbuffer.h" #include "rapidjson/prettywriter.h" namespace org::apache::nifi::minifi::c2 { static void serializeOperationInfo(rapidjson::Value& target, const C2Payload& payload, rapidjson::Document::AllocatorType& alloc) { gsl_Expects(target.IsObject()); target.AddMember("operation", rapidjson::Value(magic_enum::enum_name<Operation>(payload.getOperation()).data(), alloc), alloc); std::string id = payload.getIdentifier(); if (id.empty()) { return; } target.AddMember("operationId", rapidjson::Value(id.c_str(), alloc), alloc); std::string state_str = [&] { switch (payload.getStatus().getState()) { case state::UpdateState::FULLY_APPLIED: return "FULLY_APPLIED"; case state::UpdateState::PARTIALLY_APPLIED: return "PARTIALLY_APPLIED"; case state::UpdateState::READ_ERROR: return "OPERATION_NOT_UNDERSTOOD"; case state::UpdateState::NO_OPERATION: return "NO_OPERATION"; case state::UpdateState::SET_ERROR: default: return "NOT_APPLIED"; } }(); rapidjson::Value state(rapidjson::kObjectType); state.AddMember("state", rapidjson::Value(state_str.c_str(), alloc), alloc); state.AddMember("details", rapidjson::Value(payload.getRawDataAsString().c_str(), alloc), alloc); target.AddMember("operationState", state, alloc); target.AddMember("identifier", rapidjson::Value(id.c_str(), alloc), alloc); } static void setJsonStr(const std::string& key, const state::response::ValueNode& value, rapidjson::Value& parent, rapidjson::Document::AllocatorType& alloc) { // NOLINT rapidjson::Value valueVal; auto base_type = value.getValue(); auto type_index = base_type->getTypeIndex(); if (auto sub_type = std::dynamic_pointer_cast<core::TransformableValue>(base_type)) { valueVal.SetString(base_type->getStringValue().c_str(), alloc); } else { if (type_index == state::response::Value::BOOL_TYPE) { bool value = false; base_type->convertValue(value); valueVal.SetBool(value); } else if (type_index == state::response::Value::INT_TYPE) { int value = 0; base_type->convertValue(value); valueVal.SetInt(value); } else if (type_index == state::response::Value::UINT32_TYPE) { uint32_t value = 0; base_type->convertValue(value); valueVal.SetUint(value); } else if (type_index == state::response::Value::INT64_TYPE) { int64_t value = 0; base_type->convertValue(value); valueVal.SetInt64(value); } else if (type_index == state::response::Value::UINT64_TYPE) { int64_t value = 0; base_type->convertValue(value); valueVal.SetInt64(value); } else if (type_index == state::response::Value::DOUBLE_TYPE) { double value = 0; base_type->convertValue(value); valueVal.SetDouble(value); } else { valueVal.SetString(base_type->getStringValue().c_str(), alloc); } } parent.AddMember(rapidjson::Value(key.c_str(), alloc), valueVal, alloc); } static void mergePayloadContent(rapidjson::Value& target, const C2Payload& payload, rapidjson::Document::AllocatorType& alloc) { const std::vector<C2ContentResponse>& content = payload.getContent(); if (content.empty()) { return; } const bool all_empty = [&] { for (const auto& payload_content : content) { for (const auto& op_arg : payload_content.operation_arguments) { if (!op_arg.second.empty()) { return false; } } } return true; }(); if (all_empty) { if (!target.IsArray()) { target.SetArray(); } for (const auto& payload_content : content) { for (const auto& op_arg : payload_content.operation_arguments) { target.PushBack(rapidjson::Value(op_arg.first.c_str(), alloc), alloc); } } return; } for (const auto& payload_content : content) { if (payload_content.op == payload.getOperation()) { for (const auto& op_arg : payload_content.operation_arguments) { if (!op_arg.second.empty()) { setJsonStr(op_arg.first, op_arg.second, target, alloc); } } } } } static rapidjson::Value serializeConnectionQueues(const C2Payload& payload, std::string& label, rapidjson::Document::AllocatorType& alloc) { rapidjson::Value json_payload(payload.isContainer() ? rapidjson::kArrayType : rapidjson::kObjectType); C2Payload adjusted(payload.getOperation(), payload.getIdentifier(), payload.isRaw()); auto name = payload.getLabel(); std::string uuid; C2ContentResponse updatedContent(payload.getOperation()); for (const C2ContentResponse &content : payload.getContent()) { for (const auto& op_arg : content.operation_arguments) { if (op_arg.first == "uuid") { uuid = op_arg.second.to_string(); } updatedContent.operation_arguments.insert(op_arg); } } updatedContent.name = uuid; adjusted.setLabel(uuid); adjusted.setIdentifier(uuid); c2::AnnotatedValue nd; // name should be what was previously the TLN ( top level node ) nd = name; updatedContent.operation_arguments.insert(std::make_pair("name", nd)); // the rvalue reference is an unfortunate side effect of the underlying API decision. adjusted.addContent(std::move(updatedContent), true); mergePayloadContent(json_payload, adjusted, alloc); label = uuid; return json_payload; } std::string HeartbeatJsonSerializer::serializeJsonRootPayload(const C2Payload& payload) { rapidjson::Document json_payload(payload.isContainer() ? rapidjson::kArrayType : rapidjson::kObjectType); rapidjson::Document::AllocatorType &alloc = json_payload.GetAllocator(); serializeOperationInfo(json_payload, payload, alloc); mergePayloadContent(json_payload, payload, alloc); for (const auto &nested_payload : payload.getNestedPayloads()) { serializeNestedPayload(json_payload, nested_payload, alloc); } rapidjson::StringBuffer buffer; rapidjson::PrettyWriter<rapidjson::StringBuffer> writer(buffer); json_payload.Accept(writer); return buffer.GetString(); } void HeartbeatJsonSerializer::serializeNestedPayload(rapidjson::Value& target, const C2Payload& payload, rapidjson::Document::AllocatorType& alloc) { target.AddMember(rapidjson::Value(payload.getLabel().c_str(), alloc), serializeJsonPayload(payload, alloc), alloc); } struct NamedValue { std::string name; std::vector<rapidjson::Value> values; rapidjson::Value to_array(rapidjson::Document::AllocatorType& alloc) { rapidjson::Value arr(rapidjson::kArrayType); spread_into(arr, alloc); return arr; } void spread_into(rapidjson::Value& target_array, rapidjson::Document::AllocatorType& alloc) { gsl_Expects(target_array.IsArray()); for (auto& child : values) { target_array.PushBack(child, alloc); } } void move_into(rapidjson::Value& target, rapidjson::Document::AllocatorType& alloc) { if (values.empty()) { return; } rapidjson::Value member_key(name.c_str(), alloc); if (values.size() > 1) { if (target.IsArray()) { spread_into(target, alloc); } else { target.AddMember(member_key, to_array(alloc), alloc); } return; } if (target.IsArray()) { target.PushBack(values[0], alloc); } else { target.AddMember(member_key, values[0], alloc); } } }; class NamedValueMap { using Container = std::vector<NamedValue>; public: Container::iterator find(const std::string& key) { for (auto it = data_.begin(); it != data_.end(); ++it) { if (it->name == key) return it; } return data_.end(); } [[nodiscard]] Container::const_iterator find(const std::string& key) const { for (auto it = data_.begin(); it != data_.end(); ++it) { if (it->name == key) return it; } return data_.end(); } std::vector<rapidjson::Value>& operator[](const std::string& key) { auto it = find(key); if (it == end()) { data_.push_back(NamedValue{key, {}}); it = find(key); } return it->values; } std::vector<rapidjson::Value>& push_back(std::string key) { data_.emplace_back(NamedValue{std::move(key), {}}); return data_.back().values; } Container::iterator begin() { return data_.begin(); } Container::iterator end() { return data_.end(); } private: Container data_; }; rapidjson::Value HeartbeatJsonSerializer::serializeJsonPayload(const C2Payload& payload, rapidjson::Document::AllocatorType& alloc) { // get the name from the content rapidjson::Value json_payload(payload.isContainer() ? rapidjson::kArrayType : rapidjson::kObjectType); NamedValueMap children; const bool isQueue = payload.getLabel() == "queues"; for (const auto &nested_payload : payload.getNestedPayloads()) { std::string label = nested_payload.getLabel(); rapidjson::Value child_payload(isQueue ? serializeConnectionQueues(nested_payload, label, alloc) : serializeJsonPayload(nested_payload, alloc)); if (nested_payload.isCollapsible()) { children[label].push_back(std::move(child_payload)); } else { children.push_back(label).push_back(std::move(child_payload)); } } for (auto& child : children) { child.move_into(json_payload, alloc); } mergePayloadContent(json_payload, payload, alloc); return json_payload; } } // namespace org::apache::nifi::minifi::c2