in glean/rocksdb/rocksdb.cpp [986:1068]
void commit(rts::FactSet& facts) override {
container_.requireOpen();
if (facts.empty()) {
return;
}
if (facts.startingId() < next_id) {
rts::error("batch inserted out of sequence ({} < {})",
facts.startingId(),
next_id);
}
rocksdb::WriteBatch batch;
// NOTE: We do *not* support concurrent writes so we don't need to protect
// stats_ here because nothing should be able to replace it while we're
// running
const auto& old_stats = stats_.unprotected();
PredicateStats new_stats(old_stats);
for (auto iter = facts.enumerate(); auto fact = iter->get(); iter->next()) {
assert(fact.id >= next_id);
uint64_t mem = 0;
auto put = [&](auto family, const auto& key, const auto& value) {
check(batch.Put(
family,
key,
value
));
mem += key.size();
mem += value.size();
};
{
binary::Output k;
if (db_version <= 2) {
k.fixed(fact.id);
} else {
k.nat(fact.id.toWord());
}
binary::Output v;
v.packed(fact.type);
v.packed(fact.clause.key_size);
v.put({fact.clause.data, fact.clause.size()});
put(container_.family(Family::entities), slice(k), slice(v));
}
{
binary::Output k;
k.fixed(fact.type);
k.put(fact.key());
binary::Output v;
v.fixed(fact.id);
put(container_.family(Family::keys), slice(k), slice(v));
}
new_stats[fact.type] += MemoryStats::one(mem);
}
const auto first_free_id = facts.firstFreeId();
check(batch.Put(
container_.family(Family::admin),
toSlice(AdminId::NEXT_ID),
toSlice(first_free_id)));
for (const auto& x : new_stats) {
if (x.second != old_stats.get(x.first)) {
check(batch.Put(
container_.family(Family::stats),
toSlice(x.first.toWord()),
toSlice(x.second)));
}
}
check(container_.db->Write(container_.writeOptions, &batch));
next_id = first_free_id;
stats_.set(std::move(new_stats));
}