void C2Agent::handle_sync()

in libminifi/src/c2/C2Agent.cpp [720:872]


void C2Agent::handle_sync(const org::apache::nifi::minifi::c2::C2ContentResponse &resp) {
  logger_->log_info("Requested resource synchronization");
  auto send_error = [&] (std::string_view error) {
    logger_->log_error("{}", error);
    C2Payload response(Operation::acknowledge, state::UpdateState::SET_ERROR, resp.ident, true);
    response.setRawData(as_bytes(std::span(error.begin(), error.end())));
    enqueue_c2_response(std::move(response));
  };

  if (!asset_manager_) {
    send_error("Internal error: no asset manager");
    return;
  }

  SyncOperand operand = SyncOperand::resource;
  try {
    operand = utils::enumCast<SyncOperand>(resp.name, true);
  } catch(const std::runtime_error&) {
    send_error("Unknown operand '" + resp.name + "'");
    return;
  }

  gsl_Assert(operand == SyncOperand::resource);

  utils::file::AssetLayout asset_layout;

  auto state_it = resp.operation_arguments.find("globalHash");
  if (state_it == resp.operation_arguments.end()) {
    send_error("Malformed request, missing 'globalHash' argument");
    return;
  }

  const rapidjson::Document* state_doc = state_it->second.json();
  if (!state_doc) {
    send_error("Argument 'globalHash' is malformed");
    return;
  }

  if (!state_doc->IsObject()) {
    send_error("Malformed request, 'globalHash' is not an object");
    return;
  }

  if (!state_doc->HasMember("digest")) {
    send_error("Malformed request, 'globalHash' has no member 'digest'");
    return;
  }
  if (!(*state_doc)["digest"].IsString()) {
    send_error("Malformed request, 'globalHash.digest' is not a string");
    return;
  }

  asset_layout.digest = std::string{(*state_doc)["digest"].GetString(), (*state_doc)["digest"].GetStringLength()};

  auto resource_list_it = resp.operation_arguments.find("resourceList");
  if (resource_list_it == resp.operation_arguments.end()) {
    send_error("Malformed request, missing 'resourceList' argument");
    return;
  }

  const rapidjson::Document* resource_list = resource_list_it->second.json();
  if (!resource_list) {
    send_error("Argument 'resourceList' is malformed");
    return;
  }
  if (!resource_list->IsArray()) {
    send_error("Malformed request, 'resourceList' is not an array");
    return;
  }

  for (rapidjson::SizeType resource_idx = 0; resource_idx < resource_list->Size(); ++resource_idx) {
    auto& resource = resource_list->GetArray()[resource_idx];
    if (!resource.IsObject()) {
      send_error(fmt::format("Malformed request, 'resourceList[{}]' is not an object", resource_idx));
      return;
    }
    auto get_member_str = [&] (const char* key) -> nonstd::expected<std::string_view, std::string> {
      if (!resource.HasMember(key)) {
        return nonstd::make_unexpected(fmt::format("Malformed request, 'resourceList[{}]' has no member '{}'", resource_idx, key));
      }
      if (!resource[key].IsString()) {
        return nonstd::make_unexpected(fmt::format("Malformed request, 'resourceList[{}].{}' is not a string", resource_idx, key));
      }
      return std::string_view{resource[key].GetString(), resource[key].GetStringLength()};
    };
    auto id = get_member_str("resourceId");
    if (!id) {
      send_error(id.error());
      return;
    }
    auto name = get_member_str("resourceName");
    if (!name) {
      send_error(name.error());
      return;
    }
    auto type = get_member_str("resourceType");
    if (!type) {
      send_error(type.error());
      return;
    }
    if (type.value() != "ASSET") {
      logger_->log_info("Resource (id = '{}', name = '{}') with type '{}' is not yet supported", id.value(), name.value(), type.value());
      continue;
    }
    auto path = get_member_str("resourcePath");
    if (!path) {
      send_error(path.error());
      return;
    }
    auto url = get_member_str("url");
    if (!url) {
      send_error(url.error());
      return;
    }

    auto full_path = std::filesystem::path{path.value()} / name.value();  // NOLINT(whitespace/braces)

    auto path_valid = utils::file::validateRelativeFilePath(full_path);
    if (!path_valid) {
      send_error(path_valid.error());
      return;
    }

    asset_layout.assets.insert(utils::file::AssetDescription{
        .id = std::string{id.value()},
        .path = full_path,
        .url = std::string{url.value()}
    });
  }

  auto fetch = [&] (std::string_view url) -> nonstd::expected<std::vector<std::byte>, std::string> {
    auto resolved_url = resolveUrl(std::string{url});
    if (!resolved_url) {
      return nonstd::make_unexpected("Couldn't resolve url");
    }
    C2Payload file_response = protocol_->fetch(resolved_url.value());

    if (file_response.getStatus().getState() != state::UpdateState::READ_COMPLETE) {
      return nonstd::make_unexpected("Failed to fetch file from " + resolved_url.value());
    }

    return std::move(file_response).moveRawData();
  };

  auto result = asset_manager_->sync(asset_layout, fetch);
  if (!result) {
    send_error(result.error());
    return;
  }

  C2Payload response(Operation::acknowledge, state::UpdateState::FULLY_APPLIED, resp.ident, true);
  enqueue_c2_response(std::move(response));
}