in src/kv/store.h [389:524]
ApplyResult deserialise_snapshot(
const uint8_t* data,
size_t size,
kv::ConsensusHookPtrs& hooks,
std::vector<Version>* view_history = nullptr,
bool public_only = false) override
{
auto e = get_encryptor();
auto d = KvStoreDeserialiser(
e,
public_only ? kv::SecurityDomain::PUBLIC :
std::optional<kv::SecurityDomain>());
kv::Term term;
auto v_ = d.init(data, size, term, is_historical);
if (!v_.has_value())
{
LOG_FAIL_FMT("Initialisation of deserialise object failed");
return ApplyResult::FAIL;
}
auto v = v_.value();
std::lock_guard<std::mutex> mguard(maps_lock);
for (auto& it : maps)
{
auto& [_, map] = it.second;
map->lock();
}
std::vector<uint8_t> hash_at_snapshot;
auto h = get_history();
if (h)
{
hash_at_snapshot = d.deserialise_raw();
}
std::vector<Version> view_history_;
if (view_history)
{
view_history_ = d.deserialise_view_history();
}
OrderedChanges changes;
MapCollection new_maps;
for (auto r = d.start_map(); r.has_value(); r = d.start_map())
{
const auto map_name = r.value();
std::shared_ptr<kv::untyped::Map> map = nullptr;
auto search = maps.find(map_name);
if (search == maps.end())
{
map = std::make_shared<kv::untyped::Map>(
this,
map_name,
get_security_domain(map_name),
is_map_replicated(map_name),
should_track_dependencies(map_name));
new_maps[map_name] = map;
LOG_DEBUG_FMT(
"Creating map {} while deserialising snapshot at version {}",
map_name,
v);
}
else
{
map = search->second.second;
}
auto changes_search = changes.find(map_name);
if (changes_search != changes.end())
{
LOG_FAIL_FMT("Failed to deserialise snapshot at version {}", v);
LOG_DEBUG_FMT("Multiple writes on map {}", map_name);
return ApplyResult::FAIL;
}
auto deserialised_snapshot_changes =
map->deserialise_snapshot_changes(d);
// Take ownership of the produced change set, store it to be committed
// later
changes[map_name] = {map, std::move(deserialised_snapshot_changes)};
}
for (auto& it : maps)
{
auto& [_, map] = it.second;
map->unlock();
}
if (!d.end())
{
LOG_FAIL_FMT("Unexpected content in snapshot at version {}", v);
return ApplyResult::FAIL;
}
// Each map is committed at a different version, independently of the
// overall snapshot version. The commit versions for each map are
// contained in the snapshot and applied when the snapshot is committed.
auto r = apply_changes(
changes,
[](bool) { return std::make_tuple(NoVersion, NoVersion); },
hooks,
new_maps);
if (!r.has_value())
{
LOG_FAIL_FMT("Failed to commit deserialised snapshot at version {}", v);
return ApplyResult::FAIL;
}
{
std::lock_guard<std::mutex> vguard(version_lock);
version = v;
last_replicated = v;
last_committable = v;
}
if (h)
{
if (!h->init_from_snapshot(hash_at_snapshot))
{
return ApplyResult::FAIL;
}
}
if (view_history)
{
*view_history = std::move(view_history_);
}
return ApplyResult::PASS;
}