in glean/rocksdb/rocksdb.cpp [1070:1124]
void addOwnership(const std::vector<OwnershipSet>& ownership) override {
container_.requireOpen();
if (ownership.empty()) {
return;
}
size_t new_count = 0;
std::vector<size_t> touched;
rocksdb::WriteBatch batch;
for (const auto& set : ownership) {
uint32_t unit_id;
auto res = getUnitId(set.unit);
if (res.hasValue()) {
unit_id = *res;
if (unit_id >= ownership_unit_counters.size()) {
rts::error("inconsistent unit id {}", unit_id);
}
touched.push_back(unit_id);
} else {
unit_id = ownership_unit_counters.size() + new_count;
check(batch.Put(
container_.family(Family::ownershipUnits),
slice(set.unit),
toSlice(unit_id)));
check(batch.Put(
container_.family(Family::ownershipUnitIds),
slice(EncodedNat(unit_id).byteRange()),
slice(set.unit)));
++new_count;
}
binary::Output key;
key.nat(unit_id);
key.nat(unit_id < ownership_unit_counters.size()
? ownership_unit_counters[unit_id]
: 0);
check(batch.Put(
container_.family(Family::ownershipRaw),
slice(key),
rocksdb::Slice(
reinterpret_cast<const char *>(set.ids.data()),
set.ids.size() * sizeof(int64_t))));
}
check(container_.db->Write(container_.writeOptions, &batch));
for (auto i : touched) {
assert(i < ownership_unit_counters.size());
++ownership_unit_counters[i];
}
ownership_unit_counters.insert(
ownership_unit_counters.end(), new_count, 1);
}