ApplyResult deserialise_snapshot()

in src/kv/store.h [389:524]


    ApplyResult deserialise_snapshot(
      const uint8_t* data,
      size_t size,
      kv::ConsensusHookPtrs& hooks,
      std::vector<Version>* view_history = nullptr,
      bool public_only = false) override
    {
      auto e = get_encryptor();
      auto d = KvStoreDeserialiser(
        e,
        public_only ? kv::SecurityDomain::PUBLIC :
                      std::optional<kv::SecurityDomain>());

      kv::Term term;
      auto v_ = d.init(data, size, term, is_historical);
      if (!v_.has_value())
      {
        LOG_FAIL_FMT("Initialisation of deserialise object failed");
        return ApplyResult::FAIL;
      }
      auto v = v_.value();

      std::lock_guard<std::mutex> mguard(maps_lock);

      for (auto& it : maps)
      {
        auto& [_, map] = it.second;
        map->lock();
      }

      std::vector<uint8_t> hash_at_snapshot;
      auto h = get_history();
      if (h)
      {
        hash_at_snapshot = d.deserialise_raw();
      }

      std::vector<Version> view_history_;
      if (view_history)
      {
        view_history_ = d.deserialise_view_history();
      }

      OrderedChanges changes;
      MapCollection new_maps;

      for (auto r = d.start_map(); r.has_value(); r = d.start_map())
      {
        const auto map_name = r.value();

        std::shared_ptr<kv::untyped::Map> map = nullptr;

        auto search = maps.find(map_name);
        if (search == maps.end())
        {
          map = std::make_shared<kv::untyped::Map>(
            this,
            map_name,
            get_security_domain(map_name),
            is_map_replicated(map_name),
            should_track_dependencies(map_name));
          new_maps[map_name] = map;
          LOG_DEBUG_FMT(
            "Creating map {} while deserialising snapshot at version {}",
            map_name,
            v);
        }
        else
        {
          map = search->second.second;
        }

        auto changes_search = changes.find(map_name);
        if (changes_search != changes.end())
        {
          LOG_FAIL_FMT("Failed to deserialise snapshot at version {}", v);
          LOG_DEBUG_FMT("Multiple writes on map {}", map_name);
          return ApplyResult::FAIL;
        }

        auto deserialised_snapshot_changes =
          map->deserialise_snapshot_changes(d);

        // Take ownership of the produced change set, store it to be committed
        // later
        changes[map_name] = {map, std::move(deserialised_snapshot_changes)};
      }

      for (auto& it : maps)
      {
        auto& [_, map] = it.second;
        map->unlock();
      }

      if (!d.end())
      {
        LOG_FAIL_FMT("Unexpected content in snapshot at version {}", v);
        return ApplyResult::FAIL;
      }

      // Each map is committed at a different version, independently of the
      // overall snapshot version. The commit versions for each map are
      // contained in the snapshot and applied when the snapshot is committed.
      auto r = apply_changes(
        changes,
        [](bool) { return std::make_tuple(NoVersion, NoVersion); },
        hooks,
        new_maps);
      if (!r.has_value())
      {
        LOG_FAIL_FMT("Failed to commit deserialised snapshot at version {}", v);
        return ApplyResult::FAIL;
      }

      {
        std::lock_guard<std::mutex> vguard(version_lock);
        version = v;
        last_replicated = v;
        last_committable = v;
      }

      if (h)
      {
        if (!h->init_from_snapshot(hash_at_snapshot))
        {
          return ApplyResult::FAIL;
        }
      }

      if (view_history)
      {
        *view_history = std::move(view_history_);
      }

      return ApplyResult::PASS;
    }