in libminifi/src/c2/C2Agent.cpp [720:872]
void C2Agent::handle_sync(const org::apache::nifi::minifi::c2::C2ContentResponse &resp) {
logger_->log_info("Requested resource synchronization");
auto send_error = [&] (std::string_view error) {
logger_->log_error("{}", error);
C2Payload response(Operation::acknowledge, state::UpdateState::SET_ERROR, resp.ident, true);
response.setRawData(as_bytes(std::span(error.begin(), error.end())));
enqueue_c2_response(std::move(response));
};
if (!asset_manager_) {
send_error("Internal error: no asset manager");
return;
}
SyncOperand operand = SyncOperand::resource;
try {
operand = utils::enumCast<SyncOperand>(resp.name, true);
} catch(const std::runtime_error&) {
send_error("Unknown operand '" + resp.name + "'");
return;
}
gsl_Assert(operand == SyncOperand::resource);
utils::file::AssetLayout asset_layout;
auto state_it = resp.operation_arguments.find("globalHash");
if (state_it == resp.operation_arguments.end()) {
send_error("Malformed request, missing 'globalHash' argument");
return;
}
const rapidjson::Document* state_doc = state_it->second.json();
if (!state_doc) {
send_error("Argument 'globalHash' is malformed");
return;
}
if (!state_doc->IsObject()) {
send_error("Malformed request, 'globalHash' is not an object");
return;
}
if (!state_doc->HasMember("digest")) {
send_error("Malformed request, 'globalHash' has no member 'digest'");
return;
}
if (!(*state_doc)["digest"].IsString()) {
send_error("Malformed request, 'globalHash.digest' is not a string");
return;
}
asset_layout.digest = std::string{(*state_doc)["digest"].GetString(), (*state_doc)["digest"].GetStringLength()};
auto resource_list_it = resp.operation_arguments.find("resourceList");
if (resource_list_it == resp.operation_arguments.end()) {
send_error("Malformed request, missing 'resourceList' argument");
return;
}
const rapidjson::Document* resource_list = resource_list_it->second.json();
if (!resource_list) {
send_error("Argument 'resourceList' is malformed");
return;
}
if (!resource_list->IsArray()) {
send_error("Malformed request, 'resourceList' is not an array");
return;
}
for (rapidjson::SizeType resource_idx = 0; resource_idx < resource_list->Size(); ++resource_idx) {
auto& resource = resource_list->GetArray()[resource_idx];
if (!resource.IsObject()) {
send_error(fmt::format("Malformed request, 'resourceList[{}]' is not an object", resource_idx));
return;
}
auto get_member_str = [&] (const char* key) -> nonstd::expected<std::string_view, std::string> {
if (!resource.HasMember(key)) {
return nonstd::make_unexpected(fmt::format("Malformed request, 'resourceList[{}]' has no member '{}'", resource_idx, key));
}
if (!resource[key].IsString()) {
return nonstd::make_unexpected(fmt::format("Malformed request, 'resourceList[{}].{}' is not a string", resource_idx, key));
}
return std::string_view{resource[key].GetString(), resource[key].GetStringLength()};
};
auto id = get_member_str("resourceId");
if (!id) {
send_error(id.error());
return;
}
auto name = get_member_str("resourceName");
if (!name) {
send_error(name.error());
return;
}
auto type = get_member_str("resourceType");
if (!type) {
send_error(type.error());
return;
}
if (type.value() != "ASSET") {
logger_->log_info("Resource (id = '{}', name = '{}') with type '{}' is not yet supported", id.value(), name.value(), type.value());
continue;
}
auto path = get_member_str("resourcePath");
if (!path) {
send_error(path.error());
return;
}
auto url = get_member_str("url");
if (!url) {
send_error(url.error());
return;
}
auto full_path = std::filesystem::path{path.value()} / name.value(); // NOLINT(whitespace/braces)
auto path_valid = utils::file::validateRelativeFilePath(full_path);
if (!path_valid) {
send_error(path_valid.error());
return;
}
asset_layout.assets.insert(utils::file::AssetDescription{
.id = std::string{id.value()},
.path = full_path,
.url = std::string{url.value()}
});
}
auto fetch = [&] (std::string_view url) -> nonstd::expected<std::vector<std::byte>, std::string> {
auto resolved_url = resolveUrl(std::string{url});
if (!resolved_url) {
return nonstd::make_unexpected("Couldn't resolve url");
}
C2Payload file_response = protocol_->fetch(resolved_url.value());
if (file_response.getStatus().getState() != state::UpdateState::READ_COMPLETE) {
return nonstd::make_unexpected("Failed to fetch file from " + resolved_url.value());
}
return std::move(file_response).moveRawData();
};
auto result = asset_manager_->sync(asset_layout, fetch);
if (!result) {
send_error(result.error());
return;
}
C2Payload response(Operation::acknowledge, state::UpdateState::FULLY_APPLIED, resp.ident, true);
enqueue_c2_response(std::move(response));
}